Machine learning needs clean, steady data, and that starts with refactoring the old stuff we already ship every day.
Everyone is chasing models right now, from churn scorers to ad spend bid helpers, but the elephant in the room is the state of the data. With Apple’s privacy changes and Google pushing Topics instead of FLoC, the pressure to rely on first party signals is real. If your app and site are still sending a patchwork of events with half baked names and missing timestamps, a model will only memorize your mess. Start with machine learning readiness: choose a clear source of truth for every metric, enforce stable identifiers, pick one timezone, and make null behavior explicit. Then write down the contract in plain words and keep it visible to both code and people. Pick UTC for storage and convert at the edges. Name events with verbs and keep properties typed, with booleans for flags and numbers for amounts.
Refactor step by step. Introduce data contracts between producers and consumers, and pair them with schema versioning so changes never break training or serving. Make pipelines idempotent and replayable, because you will need to backfill after a bug or a new label drop. Event sourcing with Kafka or CDC from your OLTP is fine, as long as you keep ordering and dedupe rules written and tested. Add guardrails with dbt tests or Great Expectations and set thresholds that page a human when drift or volume goes off script. If you want a feature store, start small: a folder of documented queries that map to stable keys beats a shiny box that no one trusts. Use a schema registry if you stream, or at least freeze JSON shapes in a shared repo. Add a tiny canary table that stores daily row counts and a few percentiles so you can spot silent breaks.
For marketers this is not just plumbing. Strong first party data and clean event tracking feeds better attribution, creative ranking, and suppression lists as cookies fade. Move heavy work server side, consider server side GTM or direct streaming to BigQuery or Snowflake, and keep consent flags joined to every row. For engineers this makes model moves boring in the best way: Airflow or Dagster schedules, MLflow runs, and a small registry that tells serving where to fetch features for a given user or product. If you are on GCP or AWS, Vertex AI and SageMaker both reward clear inputs more than fancy tweaks. Before you change code, agree on ground truth for conversion, session, active user, and qualified lead, then encode them as tested views so training and reporting speak the same language. The shift from third party cookies and ATT means your model work now depends on consented signals and clean joins more than ever. Do not try to boil the ocean on day one, pick one revenue path and make it predictably boring before expanding.
Practical snippets
/* Build user_day_features with stable keys, event time, and replay support */
/* Source: raw_events with columns: event_id, user_id, event_time, ingested_at, name, amount, consent */
with deduped as (
select
* except(row_num)
from (
select
event_id,
user_id,
timestamp(event_time) as event_time,
timestamp(ingested_at) as ingested_at,
name,
cast(amount as float64) as amount,
cast(consent as bool) as consent,
row_number() over (partition by event_id order by ingested_at desc) as row_num
from raw_events
where event_time between @start_ts and @end_ts
)
where row_num = 1
),
clean as (
select
user_id,
event_time,
name,
amount,
consent
from deduped
where consent is true
),
agg as (
select
user_id,
date(event_time) as dt,
sum(case when name = 'purchase' then amount else 0 end) as revenue,
countif(name = 'session_start') as sessions
from clean
group by 1,2
)
merge into feature_store.user_day_features t
using agg s
on t.user_id = s.user_id and t.dt = s.dt
when matched then update set revenue = s.revenue, sessions = s.sessions
when not matched then insert (user_id, dt, revenue, sessions) values(s.user_id, s.dt, s.revenue, s.sessions);# Idempotent batch with watermark and simple data contract
from datetime import datetime
from typing import Tuple
import pandas as pd
WATERMARK_TABLE = "meta.process_watermarks"
FEATURE_TABLE = "feature_store.user_day_features"
def read_watermark(client) -> Tuple[pd.Timestamp, pd.Timestamp]:
row = client.read("select max(end_ts) as last from " + WATERMARK_TABLE)
last = row["last"] or pd.Timestamp("1970-01-01")
start = last
end = pd.Timestamp.utcnow().floor("H")
return start, end
def validate(df: pd.DataFrame):
assert df["user_id"].notnull().all()
assert (df["dt"] <= pd.Timestamp.utcnow().date()).all()
def write_features(client, df: pd.DataFrame):
client.merge(FEATURE_TABLE, df, keys=["user_id", "dt"])
def main(client):
start, end = read_watermark(client)
events = client.read_events(start, end)
events = events.query("consent == True")
feats = build_user_day_features(events)
validate(feats)
write_features(client, feats)
client.write(WATERMARK_TABLE, pd.DataFrame([{"end_ts": end}]))
if __name__ == "__main__":
main(bigquery_client())