Time-Series Patterns
Best practices for analytics data models.
A production-grade analytics backend handling millions of events from mobile apps.
Repository: github.com/fraiseql/examples/mobile-analytics-backend
CREATE TABLE tb_app ( pk_app SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, name TEXT NOT NULL, api_key TEXT NOT NULL UNIQUE, owner_id UUID NOT NULL, created_at TIMESTAMPTZ DEFAULT now());
CREATE TABLE tb_event ( pk_event SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, fk_app INTEGER NOT NULL REFERENCES tb_app(pk_app), event_type TEXT NOT NULL, user_id UUID, session_id TEXT NOT NULL, device JSONB NOT NULL DEFAULT '{}', metadata JSONB NOT NULL DEFAULT '{}', timestamp TIMESTAMPTZ NOT NULL DEFAULT now());
CREATE TABLE tb_metric ( pk_metric SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, fk_app INTEGER NOT NULL REFERENCES tb_app(pk_app), metric_type TEXT NOT NULL, value BIGINT NOT NULL DEFAULT 0, dimension TEXT NOT NULL, date DATE NOT NULL, created_at TIMESTAMPTZ DEFAULT now(), UNIQUE(fk_app, metric_type, dimension, date));
CREATE TABLE tb_cohort ( pk_cohort SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, fk_app INTEGER NOT NULL REFERENCES tb_app(pk_app), name TEXT NOT NULL, filter JSONB NOT NULL DEFAULT '{}', user_count INT NOT NULL DEFAULT 0, created_at TIMESTAMPTZ DEFAULT now());
-- Views with JSONB data columnCREATE VIEW v_app ASSELECT a.id, a.identifier, jsonb_build_object( 'id', a.id::text, 'name', a.name, 'owner_id', a.owner_id::text, 'created_at', a.created_at ) AS dataFROM tb_app a;
CREATE VIEW v_metric ASSELECT m.id, jsonb_build_object( 'id', m.id::text, 'metric_type', m.metric_type, 'value', m.value, 'dimension', m.dimension, 'date', m.date ) AS data, m.fk_appFROM tb_metric m;import fraiseqlfrom fraiseql.scalars import IDfrom datetime import datetime
@fraiseql.typeclass App: id: ID name: str owner_id: ID created_at: datetime
@fraiseql.typeclass Event: id: ID event_type: str user_id: ID | None session_id: str device: dict metadata: dict timestamp: datetime
@fraiseql.typeclass Metric: id: ID metric_type: str value: int dimension: str date: datetime created_at: datetime
@fraiseql.typeclass Cohort: id: ID name: str filter: dict user_count: int created_at: datetime
@fraiseql.query(sql_source="v_metric")def metrics(app_id: ID, date_from: str, date_to: str) -> list[Metric]: """Get metrics for date range.""" pass# Batch event ingestion (hundreds of events at once)mutation { trackEvents( appId: "app123" events: [ { type: "page_view" path: "/home" metadata: { referrer: "/search" } }, { type: "button_click" label: "Sign Up" metadata: { screen: "onboarding" } }, { type: "screen_time" duration: 45000 } ] ) { success eventCount }}@fraiseql.mutationasync def track_events(info, app_id: str, events: list[dict]) -> dict: """Ingest batch of events from mobile app (optimized for throughput).""" api_key = extract_api_key_from_header(info)
# Verify API key and get app app = await ctx.db.query_one( "SELECT pk_app, id FROM tb_app WHERE id = $1 AND api_key = $2", [app_id, api_key] ) if not app: raise Exception("Invalid app or API key")
user_id = extract_user_id_from_request(info) session_id = extract_session_id_from_request(info)
# Normalize events with defaults normalized_events = [ { 'fk_app': app['pk_app'], 'event_type': event.get('type'), 'user_id': user_id, 'session_id': session_id, 'device': extract_device_info(info), 'metadata': event.get('metadata', {}), 'timestamp': datetime.utcnow() } for event in events ]
# Batch insert for performance await ctx.db.execute_many( """ INSERT INTO tb_event (fk_app, event_type, user_id, session_id, device, metadata, timestamp) VALUES ($1, $2, $3, $4, $5::jsonb, $6::jsonb, $7) """, [(e['fk_app'], e['event_type'], e['user_id'], e['session_id'], json.dumps(e['device']), json.dumps(e['metadata']), e['timestamp']) for e in normalized_events] )
# Publish events to NATS for real-time processing for event in normalized_events: await fraiseql.nats.publish(f"app:{app_id}:event", event)
# Update rolling aggregations asyncio.create_task(update_live_metrics(app['pk_app'], events))
return { 'success': True, 'event_count': len(events) }# Get live metrics for dashboardquery { metrics( appId: "app123" dateRange: { from: "2024-01-01", to: "2024-01-31" } ) { metric_type value dimension date }}
# Get user cohortsquery { cohorts(appId: "app123", limit: 20) { id name userCount retentionRate }}
# Get funnels (multi-step user journeys)query { funnel( appId: "app123" steps: ["onboarding", "payment_screen", "purchase_complete"] ) { step users dropoffPercent }}# Subscribe to live dashboard metricssubscription { liveMetrics(appId: "app123", interval: 60) { timestamp pageviews uniqueUsers crashCount avgSessionDuration }}async def update_live_metrics(fk_app: int, events: list[dict]): """Roll up events into metrics (daily, weekly, monthly).""" for event in events: event_type = event.get('type', 'unknown') device = event.get('device', {}) dimension = device.get('os', 'unknown') today = datetime.utcnow().date()
# Upsert daily metric await ctx.db.execute( """ INSERT INTO tb_metric (fk_app, metric_type, dimension, value, date) VALUES ($1, $2, $3, 1, $4) ON CONFLICT (fk_app, metric_type, dimension, date) DO UPDATE SET value = tb_metric.value + 1 """, [fk_app, f"{event_type}_count", dimension, today] )@fraiseql.queryasync def cohort_retention(info, app_id: str, cohort_id: str, interval: str = "day") -> list[dict]: """Calculate retention over time for a cohort.""" cohort = await ctx.db.query_one( "SELECT filter FROM tb_cohort WHERE id = $1", [cohort_id] )
retention_data = [] for days_offset in range(0, 30, 7): # Weekly retention for 30 days cohort_date = datetime.utcnow().date() - timedelta(days=days_offset)
returning_users = await ctx.db.query( """ SELECT DISTINCT user_id FROM tb_event WHERE fk_app = (SELECT pk_app FROM tb_app WHERE id = $1) AND user_id IN ( SELECT DISTINCT user_id FROM tb_event WHERE DATE(timestamp) = $2 AND fk_app = (SELECT pk_app FROM tb_app WHERE id = $1) ) AND DATE(timestamp) >= $2 LIMIT 10000 """, [app_id, cohort_date] )
retention_data.append({ 'day_offset': days_offset, 'date': cohort_date, 'returning_users': len(returning_users) })
return retention_data# Delete old events (older than 90 days) automatically@fraiseql.scheduled_task(cron="0 2 * * *") # Daily at 2 AMasync def cleanup_old_events(): """Archive events older than 90 days.""" ninety_days_ago = datetime.utcnow() - timedelta(days=90)
old_events = await ctx.db.query( "SELECT * FROM tb_event WHERE timestamp < $1", [ninety_days_ago] )
if old_events: await s3_archive.put( f"events/archive-{ninety_days_ago.date()}.json", json.dumps(old_events, default=str) )
# Delete from hot database await ctx.db.execute( "DELETE FROM tb_event WHERE timestamp < $1", [ninety_days_ago] )Clone the example
git clone https://github.com/fraiseql/examples/mobile-analytics-backendcd mobile-analytics-backendSet up environment
cp .env.example .envuv syncStart services (includes TimescaleDB for time-series)
docker-compose up -d postgres redis natsRun migrations
confiture migrateStart FraiseQL server
fraiseql runGenerate test events
python scripts/generate_test_events.pyTime-Series Patterns
Best practices for analytics data models.
Performance Tuning
Optimize high-throughput ingestion pipelines.
Scaling Guide
Horizontal scaling for millions of events.
Caching Strategy
Redis caching for aggregated metrics.