---
name: event-pipeline
description: >-
  Event pipeline patterns for 20B events/day including Protobuf schemas,
  telemetry ingestion, Pub/Sub partitioning, Flink streaming jobs, BigQuery
  tables, Redis deduplication, and backpressure handling. Use when working
  with telemetry, analytics, or the AdTech data flow.
---

# Event Pipeline Skill

## Instructions

When users work with telemetry events, analytics, feature computation, or AdTech data:

1. Reference the architecture document Sections 4, 7.4, and 10 for implementations
2. Follow the Go patterns for telemetry-ingestion service
3. Use Protobuf for event serialization (not JSON -- wire efficiency matters at scale)
4. Apply deduplication before Pub/Sub publish
5. Respect the AI/Ad firewall -- conversation content MUST NOT reach ad pipeline

## Scale Metrics

| Metric | Value | Notes |
|--------|-------|-------|
| Daily events | 20B | All browser telemetry + ad events |
| Avg throughput | ~231K events/sec | 20B / 86,400s |
| Peak throughput | 700K-1.15M events/sec | 3-5x during US business hours |
| Raw data volume | ~10 TB/day | Protobuf encoding (~500 bytes avg) |
| End-to-end latency | p50 < 2s, p99 < 8s | Ingestion to BigQuery queryable |

## Pipeline Flow

```
Browser (Chromium)
  -> Local Event Buffer (5s / 100 events / 64KB -- whichever first)
  -> POST /v1/events/batch (Protobuf binary, gzip, mTLS)
  -> Telemetry Ingestion (Go, 12 pods HPA) -- validate + dedup
  -> Cloud Pub/Sub (200 partitions, ordered by user_id hash, 7d retention)
  -> Flink "features" (8 TMs) -> Aerospike (feature store, p99 < 1ms)
  -> Flink "sessions" (4 TMs) -> BigQuery (sessions table)
  -> BigQuery Subscription (Storage Write API) -> BigQuery (raw events)
```

## Protobuf Schema

**Top-level envelope: `BrowserEvent`**
- `event_id`: `evt_{ulid}` (globally unique, client-generated)
- `user_id`: `usr_{uuid_v4}` (anonymous, rotated on "clear data")
- `device_id`: Per-installation (survives profile resets)
- `session_id`: Monotonic counter (30min inactivity gap = new session)
- `payload`: oneof -- PageView, SearchEvent, AIInteraction, Navigation, Performance, AdEvent

**Event types:**
| Type | Key Fields |
|------|-----------|
| PageView | url, title, referrer, load_time_ms, scroll_depth_pct |
| SearchEvent | query, engine, result_count, clicked_position |
| AIInteraction | conversation_id, task_type, model, input/output_tokens, latency_ms, cached |
| Navigation | url_from, url_to, trigger (link/typed/bookmark/back/ai_suggest) |
| Performance | metric_name (FCP/LCP/CLS/FID/TTFB/INP), value, url |
| AdEvent | ad_id, campaign_id, placement, action (impression/viewable/click/conversion), revenue_micros |

## Deduplication

Redis SETNX with 1-second time buckets at the ingestion layer.

**Fingerprint**: `SHA256(user_id + event_type_name + timestamp_truncated_to_1s)`[:128 bits]

**Key design decisions:**
- 5-minute TTL covers worst-case client retry storms (3 retries x 30s backoff x 2x jitter)
- Key prefix: `dedup:{fingerprint}`
- **Fail open**: on Redis failure, let event through (rare duplicate in BQ is harmless)
- Sub-second duplicates are physically impossible for human-generated events

## BigQuery Tables

### browser_events (raw)
- Partitioned by `DATE(event_time)`, clustered by `user_id, event_type`
- Denormalized: flattened payload fields (avoids JOINs at 10TB/day scan)
- 90-day partition expiration, `require_partition_filter = true`

### ai_usage (aggregated)
- Materialized every 15 minutes from browser_events
- Partitioned by `DATE(created_at)`, clustered by `user_id, model`
- 2-year retention (billing compliance)

### user_sessions (Flink-built)
- Session = first event to 30min inactivity gap
- Entry/exit URL, event counts, token totals, revenue totals
- 1-year retention

## Backpressure Chain

| Layer | Mechanism |
|-------|-----------|
| Browser SDK | 1000 event buffer, exponential backoff on 429/503, drop oldest non-ad on overflow |
| Telemetry Ingestion | HPA on Pub/Sub backlog (target < 10K), 50K events/sec per-pod rate limit |
| Pub/Sub | 7-day retention, DLT after 5 delivery attempts, DLT -> GCS archive |
| Flink | 30s checkpoints to GCS (incremental RocksDB), fixed-delay restart (3 attempts, 10s) |
| BigQuery | Committed mode (exactly-once), 500 rows/5s batching, spill to GCS on quota exceeded |
| Aerospike | COMMIT_ALL write policy, retry once on timeout, skip on persistent failure |

## AI/Ad Firewall (Section 7.4)

**Critical privacy boundary.** AI conversation content must NEVER reach the ad pipeline.

**Two-stage filter:**
1. `AIAdFirewall` (FilterFunction): drops pure AI content events entirely
2. `SanitizeForAdPipeline` (MapFunction): strips non-allowlisted fields

**Allowlisted for ad pipeline:** event_id, user_id, event_type, timestamp, session_id, domain (top-level only), category, dwell_time_ms, device_type, os, country, region

**BLOCKED fields:** prompt, response, conversation_history, page_content, selected_text, url_path, search_query_text, form_data, clipboard, email_content

## Feature Computation (Section 10.2)

**Purchase Intent Processor** (Flink keyed by user_id):
- Signal weights: product_page_view (3.0), add_to_cart (10.0), price_comparison (5.0), checkout_started (15.0), coupon_search (7.0)
- Time decay: exponential with 6-hour half-life
- Score capped at 100, stale categories cleaned when < 0.01
- Throttled writes to Aerospike: at most every 30s per user

**Aerospike feature store:**
- Namespace: `user_features`, Set: `realtime`
- Key: anonymized user_id (SHA256 of device_id + rotation_salt, rotated every 90 days)
- Bins: interest_vector (128-dim), purchase_intent (per-category), session metrics, time patterns
- TTL: 30 days, Replication: 2x across 3-node cluster

## Debugging

### Events Not Appearing in BigQuery
- Check Pub/Sub subscription backlog -- may indicate Flink consumer lag
- Verify BQ Storage Write API quotas (check for spill-to-GCS)
- Confirm partition filter in queries (`WHERE event_time >= ...`)

### High Dedup Rate (> 20%)
- Indicates client bug (sending same events repeatedly)
- Check browser SDK batch_id and seq_number for gaps
- Verify client retry backoff is working correctly

### Flink Checkpoint Failures
- Check GCS write permissions for checkpoint state backend
- Monitor RocksDB state size -- may need more TaskManager memory
- Verify Pub/Sub committed offsets are advancing

### Feature Store Stale
- Check Flink "features" job health and checkpoint duration
- Verify Aerospike write policy (COMMIT_ALL) is not timing out
- Monitor the 30s write throttle -- may need adjustment at scale
