Skip to content

Mobile Analytics Backend

A production-grade analytics backend handling millions of events from mobile apps.

Repository: github.com/fraiseql/examples/mobile-analytics-backend

  • High-Volume Data Ingestion: Batch event ingestion from mobile clients
  • Time-Series Data: Analytics stored with timestamps for trend analysis
  • Aggregations: Roll-up analytics (daily, weekly, monthly)
  • Multi-App Support: Serve multiple mobile apps from one backend
  • Real-Time Dashboards: Live metrics via subscriptions
  • Data Retention: Automatic cleanup of old data
  • Performance: Handle 100K+ events/second
  • Cost Optimization: Efficient storage and querying
CREATE TABLE tb_app (
pk_app SERIAL PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
api_key TEXT NOT NULL UNIQUE,
owner_id UUID NOT NULL,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE tb_event (
pk_event SERIAL PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
fk_app INTEGER NOT NULL REFERENCES tb_app(pk_app),
event_type TEXT NOT NULL,
user_id UUID,
session_id TEXT NOT NULL,
device JSONB NOT NULL DEFAULT '{}',
metadata JSONB NOT NULL DEFAULT '{}',
timestamp TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE tb_metric (
pk_metric SERIAL PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
fk_app INTEGER NOT NULL REFERENCES tb_app(pk_app),
metric_type TEXT NOT NULL,
value BIGINT NOT NULL DEFAULT 0,
dimension TEXT NOT NULL,
date DATE NOT NULL,
created_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(fk_app, metric_type, dimension, date)
);
CREATE TABLE tb_cohort (
pk_cohort SERIAL PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE,
fk_app INTEGER NOT NULL REFERENCES tb_app(pk_app),
name TEXT NOT NULL,
filter JSONB NOT NULL DEFAULT '{}',
user_count INT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT now()
);
-- Views with JSONB data column
CREATE VIEW v_app AS
SELECT
a.id,
a.identifier,
jsonb_build_object(
'id', a.id::text,
'name', a.name,
'owner_id', a.owner_id::text,
'created_at', a.created_at
) AS data
FROM tb_app a;
CREATE VIEW v_metric AS
SELECT
m.id,
jsonb_build_object(
'id', m.id::text,
'metric_type', m.metric_type,
'value', m.value,
'dimension', m.dimension,
'date', m.date
) AS data,
m.fk_app
FROM tb_metric m;
# Batch event ingestion (hundreds of events at once)
mutation {
trackEvents(
appId: "app123"
events: [
{
type: "page_view"
path: "/home"
metadata: { referrer: "/search" }
},
{
type: "button_click"
label: "Sign Up"
metadata: { screen: "onboarding" }
},
{
type: "screen_time"
duration: 45000
}
]
) {
success
eventCount
}
}
@fraiseql.mutation
async def track_events(info, app_id: str, events: list[dict]) -> dict:
"""Ingest batch of events from mobile app (optimized for throughput)."""
api_key = extract_api_key_from_header(info)
# Verify API key and get app
app = await ctx.db.query_one(
"SELECT pk_app, id FROM tb_app WHERE id = $1 AND api_key = $2",
[app_id, api_key]
)
if not app:
raise Exception("Invalid app or API key")
user_id = extract_user_id_from_request(info)
session_id = extract_session_id_from_request(info)
# Normalize events with defaults
normalized_events = [
{
'fk_app': app['pk_app'],
'event_type': event.get('type'),
'user_id': user_id,
'session_id': session_id,
'device': extract_device_info(info),
'metadata': event.get('metadata', {}),
'timestamp': datetime.utcnow()
}
for event in events
]
# Batch insert for performance
await ctx.db.execute_many(
"""
INSERT INTO tb_event (fk_app, event_type, user_id, session_id, device, metadata, timestamp)
VALUES ($1, $2, $3, $4, $5::jsonb, $6::jsonb, $7)
""",
[(e['fk_app'], e['event_type'], e['user_id'], e['session_id'],
json.dumps(e['device']), json.dumps(e['metadata']), e['timestamp'])
for e in normalized_events]
)
# Publish events to NATS for real-time processing
for event in normalized_events:
await fraiseql.nats.publish(f"app:{app_id}:event", event)
# Update rolling aggregations
asyncio.create_task(update_live_metrics(app['pk_app'], events))
return {
'success': True,
'event_count': len(events)
}
# Get live metrics for dashboard
query {
metrics(
appId: "app123"
dateRange: { from: "2024-01-01", to: "2024-01-31" }
) {
metric_type
value
dimension
date
}
}
# Get user cohorts
query {
cohorts(appId: "app123", limit: 20) {
id
name
userCount
retentionRate
}
}
# Get funnels (multi-step user journeys)
query {
funnel(
appId: "app123"
steps: ["onboarding", "payment_screen", "purchase_complete"]
) {
step
users
dropoffPercent
}
}
# Subscribe to live dashboard metrics
subscription {
liveMetrics(appId: "app123", interval: 60) {
timestamp
pageviews
uniqueUsers
crashCount
avgSessionDuration
}
}
async def update_live_metrics(fk_app: int, events: list[dict]):
"""Roll up events into metrics (daily, weekly, monthly)."""
for event in events:
event_type = event.get('type', 'unknown')
device = event.get('device', {})
dimension = device.get('os', 'unknown')
today = datetime.utcnow().date()
# Upsert daily metric
await ctx.db.execute(
"""
INSERT INTO tb_metric (fk_app, metric_type, dimension, value, date)
VALUES ($1, $2, $3, 1, $4)
ON CONFLICT (fk_app, metric_type, dimension, date)
DO UPDATE SET value = tb_metric.value + 1
""",
[fk_app, f"{event_type}_count", dimension, today]
)
@fraiseql.query
async def cohort_retention(info, app_id: str,
cohort_id: str,
interval: str = "day") -> list[dict]:
"""Calculate retention over time for a cohort."""
cohort = await ctx.db.query_one(
"SELECT filter FROM tb_cohort WHERE id = $1",
[cohort_id]
)
retention_data = []
for days_offset in range(0, 30, 7): # Weekly retention for 30 days
cohort_date = datetime.utcnow().date() - timedelta(days=days_offset)
returning_users = await ctx.db.query(
"""
SELECT DISTINCT user_id
FROM tb_event
WHERE fk_app = (SELECT pk_app FROM tb_app WHERE id = $1)
AND user_id IN (
SELECT DISTINCT user_id FROM tb_event
WHERE DATE(timestamp) = $2
AND fk_app = (SELECT pk_app FROM tb_app WHERE id = $1)
)
AND DATE(timestamp) >= $2
LIMIT 10000
""",
[app_id, cohort_date]
)
retention_data.append({
'day_offset': days_offset,
'date': cohort_date,
'returning_users': len(returning_users)
})
return retention_data
# Delete old events (older than 90 days) automatically
@fraiseql.scheduled_task(cron="0 2 * * *") # Daily at 2 AM
async def cleanup_old_events():
"""Archive events older than 90 days."""
ninety_days_ago = datetime.utcnow() - timedelta(days=90)
old_events = await ctx.db.query(
"SELECT * FROM tb_event WHERE timestamp < $1",
[ninety_days_ago]
)
if old_events:
await s3_archive.put(
f"events/archive-{ninety_days_ago.date()}.json",
json.dumps(old_events, default=str)
)
# Delete from hot database
await ctx.db.execute(
"DELETE FROM tb_event WHERE timestamp < $1",
[ninety_days_ago]
)
  1. Clone the example

    Terminal window
    git clone https://github.com/fraiseql/examples/mobile-analytics-backend
    cd mobile-analytics-backend
  2. Set up environment

    Terminal window
    cp .env.example .env
    uv sync
  3. Start services (includes TimescaleDB for time-series)

    Terminal window
    docker-compose up -d postgres redis nats
  4. Run migrations

    Terminal window
    confiture migrate
  5. Start FraiseQL server

    Terminal window
    fraiseql run
  6. Generate test events

    Terminal window
    python scripts/generate_test_events.py
  1. Basic: Ingest and query events
  2. Aggregation: Calculate metrics and rollups
  3. Analysis: Build funnels and cohorts
  4. Scale: Handle millions of events/day
  5. Real-Time: Add live metrics subscriptions

Time-Series Patterns

Best practices for analytics data models.

Performance Tuning

Optimize high-throughput ingestion pipelines.

Scaling Guide

Horizontal scaling for millions of events.

Caching Strategy

Redis caching for aggregated metrics.