Analytics
Analytics — Analytics patterns and aggregations
FraiseQL includes an Apache Arrow Flight dataplane for high-throughput columnar data access, enabling efficient analytics, OLAP queries, and data science workloads.
Arrow Flight provides:
Use this table to decide when to reach for Arrow Flight versus the standard GraphQL endpoint:
| Dimension | GraphQL Endpoint | Arrow Flight |
|---|---|---|
| Format | JSON (row-oriented) | Apache Arrow IPC (columnar) |
| Typical throughput | ~50 MB/s | ~500 MB/s |
| Best for | CRUD, real-time queries, web clients | Analytics, ML, ETL, data science |
| Client libraries | Any HTTP client | Arrow Flight SDK required |
| Filtering | GraphQL where arguments | Pushed down to PostgreSQL |
| Streaming | NDJSON via Wire protocol | Native chunked record batches |
| Authentication | JWT / API key | Bearer token (same as GraphQL) |
| Port | 3000 (default) | 50051 (default) |
Arrow Flight is a compile-time feature — there is no [arrow] TOML configuration section. Enable it by building FraiseQL with the arrow feature flag:
cargo build --release --features arrowWhen the arrow feature is compiled in, the Flight gRPC server starts automatically on port 50051 alongside the HTTP server (port 3000). Authentication uses the same JWT/API key tokens as the GraphQL endpoint — pass the token as a gRPC authorization header (Bearer eyJ...). TLS follows the server-level TLS configuration.
Create views optimized for columnar access:
-- Analytics view for ArrowCREATE VIEW va_orders_daily ASSELECT date_trunc('day', created_at) AS order_date, customer_region, product_category, COUNT(*) AS order_count, SUM(total) AS revenue, AVG(total) AS avg_order_valueFROM tb_order oJOIN tb_customer c ON c.pk_customer = o.fk_customerJOIN tb_product p ON p.pk_product = o.fk_productGROUP BY 1, 2, 3;Create tables for columnar storage:
-- Fact table for Arrow analyticsCREATE TABLE ta_events ( event_id UUID, event_type TEXT, user_id UUID, session_id UUID, page_url TEXT, referrer TEXT, device_type TEXT, country TEXT, properties JSONB, created_at TIMESTAMPTZ);
-- Columnar storage (if using TimescaleDB or similar)SELECT create_hypertable('ta_events', 'created_at');import fraiseqlfrom fraiseql.scalars import ID, DateTime, Decimal
@fraiseql.type(dataplane="arrow")class OrdersDaily: """Daily order aggregates for analytics.""" order_date: DateTime customer_region: str product_category: str order_count: int revenue: Decimal avg_order_value: Decimal
@fraiseql.arrow_query(sql_source="va_orders_daily")def orders_daily( start_date: DateTime, end_date: DateTime, regions: list[str] | None = None) -> list[OrdersDaily]: """Query daily order aggregates.""" passimport pyarrow.flight as flightimport json
# Connect to FraiseQL Arrow Flight serverclient = flight.connect("grpc://localhost:50051")
# Authenticatetoken = client.authenticate_basic_token("user", "password")options = flight.FlightCallOptions(headers=[(b"authorization", token)])
# Execute queryticket = flight.Ticket(json.dumps({ "query": "orders_daily", "variables": {"start_date": "2024-01-01", "end_date": "2024-12-31"}}).encode())reader = client.do_get(ticket, options)
# Read as Arrow Tabletable = reader.read_all()print(table.schema)# order_date: timestamp[us, tz=UTC]# customer_region: string# product_category: string# order_count: int64# revenue: decimal128(18, 2)# avg_order_value: decimal128(18, 2)
print(table.to_pandas())# order_date customer_region product_category order_count revenue avg_order_value# 0 2024-01-01 US Electronics 1234 98765.00 80.04# 1 2024-01-01 EU Electronics 876 71234.00 81.32# ...import pandas as pdimport pyarrow.flight as flightimport json
def query_fraiseql(query: str, variables: dict | None = None) -> pd.DataFrame: """Execute FraiseQL Arrow query and return DataFrame.""" client = flight.connect("grpc://localhost:50051")
payload = {"query": query, "variables": variables or {}} ticket = flight.Ticket(json.dumps(payload).encode())
reader = client.do_get(ticket) return reader.read_pandas()
# Usagedf = query_fraiseql( "orders_daily", {"start_date": "2024-01-01", "end_date": "2024-12-31"})
print(df.head())# order_date customer_region product_category order_count revenue# 0 2024-01-01 US Electronics 1234 98765.00# 1 2024-01-01 EU Electronics 876 71234.00
print(f"Total revenue: ${df['revenue'].sum():,.2f}")# Total revenue: $4,521,890.00import { FlightClient } from '@apache-arrow/flight';import { tableFromIPC, RecordBatch } from 'apache-arrow';
async function queryArrowFlight( query: string, variables: Record<string, unknown> = {}): Promise<RecordBatch[]> { const client = new FlightClient({ address: 'grpc://localhost:50051' });
const ticket = Buffer.from(JSON.stringify({ query, variables })); const batches: RecordBatch[] = [];
for await (const batch of client.doGet({ ticket })) { batches.push(batch); }
await client.close(); return batches;}
// Usageconst batches = await queryArrowFlight('orders_daily', { start_date: '2024-01-01', end_date: '2024-12-31'});
const table = tableFromIPC(batches);console.log(`Rows: ${table.numRows}`);// Rows: 42850
for (const row of table) { console.log(row.toJSON()); // { order_date: '2024-01-01', customer_region: 'US', revenue: 98765.00 }}Install the required packages:
npm install apache-arrow @apache-arrow/flightimport duckdb
# Register Arrow Flight data sourceduckdb.execute(""" INSTALL arrow; LOAD arrow;""")
# Query directly from FraiseQLresult = duckdb.execute(""" SELECT order_date, SUM(revenue) as total_revenue FROM arrow_scan('grpc://localhost:50051', 'orders_daily') WHERE order_date >= '2024-01-01' GROUP BY order_date ORDER BY order_date""").fetchdf()import pyarrow.flight as flightimport jsonfrom collections.abc import Iteratorimport pandas as pd
def stream_large_dataset(query: str, chunk_size: int = 100000) -> Iterator[pd.DataFrame]: """Stream large datasets in chunks.""" client = flight.connect("grpc://localhost:50051")
ticket = flight.Ticket(json.dumps({"query": query}).encode()) reader = client.do_get(ticket)
for chunk in reader: batch = chunk.data # Process batch yield batch.to_pandas()
# Usage: process 50M rows without loading all into memoryfor df_chunk in stream_large_dataset("large_events_export"): process(df_chunk) print(f"Processed {len(df_chunk)} rows")import pyarrow as paimport pyarrow.compute as pc
def aggregate_without_loading_all(query: str) -> dict: """Sum revenue without loading entire dataset into memory.""" client = flight.connect("grpc://localhost:50051")
ticket = flight.Ticket(json.dumps({"query": query}).encode()) reader = client.do_get(ticket)
total_revenue = 0.0 row_count = 0
for chunk in reader: batch = chunk.data revenue_col = batch.column("revenue") total_revenue += pc.sum(revenue_col).as_py() row_count += len(batch)
return {"total_revenue": total_revenue, "rows": row_count}
result = aggregate_without_loading_all("orders_daily")print(result)# {'total_revenue': 4521890.0, 'rows': 42850}CREATE VIEW va_metrics_hourly ASSELECT date_trunc('hour', created_at) AS hour, metric_name, COUNT(*) AS sample_count, AVG(value) AS avg_value, MIN(value) AS min_value, MAX(value) AS max_value, percentile_cont(0.5) WITHIN GROUP (ORDER BY value) AS p50, percentile_cont(0.95) WITHIN GROUP (ORDER BY value) AS p95, percentile_cont(0.99) WITHIN GROUP (ORDER BY value) AS p99FROM ta_metricsGROUP BY 1, 2;CREATE VIEW va_user_cohorts ASWITH first_purchase AS ( SELECT user_id, date_trunc('month', MIN(created_at)) AS cohort_month FROM tb_order GROUP BY 1)SELECT fp.cohort_month, date_trunc('month', o.created_at) AS order_month, COUNT(DISTINCT o.user_id) AS users, SUM(o.total) AS revenueFROM first_purchase fpJOIN tb_order o ON o.user_id = fp.user_idGROUP BY 1, 2;CREATE VIEW va_funnel ASWITH events AS ( SELECT session_id, event_type, created_at, ROW_NUMBER() OVER (PARTITION BY session_id ORDER BY created_at) AS event_order FROM ta_events WHERE event_type IN ('page_view', 'add_to_cart', 'checkout', 'purchase'))SELECT date_trunc('day', e1.created_at) AS day, COUNT(DISTINCT e1.session_id) AS page_views, COUNT(DISTINCT e2.session_id) AS add_to_cart, COUNT(DISTINCT e3.session_id) AS checkout, COUNT(DISTINCT e4.session_id) AS purchaseFROM events e1LEFT JOIN events e2 ON e2.session_id = e1.session_id AND e2.event_type = 'add_to_cart'LEFT JOIN events e3 ON e3.session_id = e1.session_id AND e3.event_type = 'checkout'LEFT JOIN events e4 ON e4.session_id = e1.session_id AND e4.event_type = 'purchase'WHERE e1.event_type = 'page_view'GROUP BY 1;| Batch Size | Use Case |
|---|---|
| 1,024 | Low latency, small queries |
| 65,536 | General purpose (default) |
| 1,000,000 | Large analytics, high throughput |
FraiseQL pushes filters to the database:
# This queryquery_fraiseql( "orders_daily", {"start_date": "2024-01-01", "regions": ["US", "CA"]})
# Generates SQL with WHERE clause:# SELECT ... FROM va_orders_daily# WHERE order_date >= '2024-01-01'# AND customer_region IN ('US', 'CA')Only requested columns are transferred:
# Request specific columnsticket = flight.Ticket(json.dumps({ "query": "orders_daily", "columns": ["order_date", "revenue"] # Only these columns}).encode())| Metric | Description |
|---|---|
fraiseql_arrow_requests_total | Total Arrow Flight requests |
fraiseql_arrow_bytes_sent | Bytes transferred |
fraiseql_arrow_batches_sent | Number of record batches |
fraiseql_arrow_query_duration | Query execution time |
fraiseql status --arrow
# Arrow Flight Server# Status: Running# Port: 50051# Active connections: 12# Bytes sent (24h): 1.2 TB# Queries (24h): 45,230@fraiseql.arrow_query( sql_source="va_sensitive_metrics", requires_scope="analytics:read")def sensitive_metrics() -> list[Metric]: """Requires analytics:read scope.""" pass| Data Type | Table Prefix | Use Case |
|---|---|---|
| Aggregates | va_ | Pre-computed summaries |
| Raw events | ta_ | Event streams, logs |
| Facts | tf_ | Star schema analytics |
-- Index on commonly filtered columnsCREATE INDEX idx_ta_events_created ON ta_events(created_at);CREATE INDEX idx_ta_events_type ON ta_events(event_type);CREATE INDEX idx_ta_events_user ON ta_events(user_id);Start FraiseQL with Arrow enabled:
fraiseql runCheck logs for Arrow Flight startup:
INFO: Arrow Flight dataplane listening on 0.0.0.0:50051Test with a simple query:
import pyarrow.flight as flight
client = flight.connect("grpc://localhost:50051")ticket = flight.Ticket(b'{"query": "orders_daily"}')reader = client.do_get(ticket)table = reader.read_all()print(f"Rows returned: {table.num_rows}")print(f"Columns: {table.column_names}")Expected output:
Rows returned: 365Columns: ['order_date', 'customer_region', 'product_category', 'order_count', 'revenue', 'avg_order_value']Verify streaming for large datasets:
# Count rows without loading all into memoryclient = flight.connect("grpc://localhost:50051")ticket = flight.Ticket(b'{"query": "large_export"}')reader = client.do_get(ticket)
row_count = 0for chunk in reader: row_count += chunk.data.num_rows print(f"Processed batch: {chunk.data.num_rows} rows")
print(f"Total rows: {row_count}")Expected output:
Processed batch: 65536 rowsProcessed batch: 65536 rowsProcessed batch: 18928 rowsTotal rows: 150000Check metrics endpoint:
curl http://localhost:9090/metrics | grep fraiseql_arrowExpected output:
fraiseql_arrow_requests_total 42fraiseql_arrow_rows_streamed_total 2845000fraiseql_arrow_bytes_sent 184320000If you get Connection refused when connecting:
Verify FraiseQL was compiled with the arrow feature:
fraiseql --version # should mention Arrow FlightCheck the port is correct:
netstat -tlnp | grep 50051Check FraiseQL logs for Arrow Flight startup errors
Arrow Flight is only as fast as the underlying SQL query:
-- Check query execution time in PostgreSQLEXPLAIN ANALYZE SELECT * FROM va_orders_daily;If the query is slow:
max_batch_size for better throughputIf clients run out of memory:
# Use chunked reading instead of read_all()for chunk in reader: process(chunk.data.to_pandas()) # Process one chunk at a timeAnalytics
Analytics — Analytics patterns and aggregations
Performance
Performance — Query optimization
NATS
NATS — Real-time event streaming