Federation + NATS Integration
Distributed Coordination — Combine federation with NATS
Advanced NATS patterns for building reliable, scalable event-driven systems with FraiseQL.
The data flow is:
fn_create_order)Python code defines the schema at compile time. All NATS communication happens inside the Rust runtime.
graph LR subgraph PYTHON["Python (compile-time)"] schema["@fraiseql.mutation\nsql_source='fn_create_order'"] end
subgraph RUST["Rust Runtime"] server["FraiseQL Server"] observer["Observer Worker"] end
subgraph POSTGRES["PostgreSQL"] fn["fn_create_order()"] pg_notify["LISTEN/NOTIFY\nor CDC"] end
subgraph NATS["NATS JetStream"] stream["fraiseql_events stream\nfraiseql.mutation.Order.INSERT"] end
subgraph CONSUMERS["External Consumers"] svc_a["Email Service"] svc_b["Analytics Service"] end
schema -->|"schema.json → compile"| server server --> fn fn --> pg_notify pg_notify --> observer observer -->|"publish"| stream stream --> svc_a stream --> svc_bfraiseql.toml)Enable the NATS observer backend in your main config file:
[project]name = "my-app"version = "1.0.0"
[fraiseql]schema_file = "schema.json"output_file = "schema.compiled.json"
[observers]backend = "nats"nats_url = "${NATS_URL}"nats_url is read from the NATS_URL environment variable at runtime. For a single server, use nats://localhost:4222. For a cluster, set NATS_URL to the URL of any cluster member — NATS client libraries handle cluster discovery automatically.
fraiseql-observer.toml)The observer worker is a separate process that reads from NATS and executes observer actions. Its advanced transport settings live in fraiseql-observer.toml:
[transport]transport = "nats"
[transport.nats]url = "${NATS_URL}"stream_name = "fraiseql_events"subject_prefix = "fraiseql.mutation"consumer_name = "fraiseql_observer_worker"
[transport.nats.jetstream]max_bytes = 10_737_418_240 # 10 GB retention limitmax_age_days = 7 # Retain events for 7 daysmax_deliver = 3 # Retry delivery up to 3 timesack_wait_secs = 30 # Re-deliver if not acked within 30sdedup_window_minutes = 5 # Deduplicate messages within 5-min windowFraiseQL publishes events on subjects following this pattern:
{subject_prefix}.{EntityType}.{OPERATION}With the default subject_prefix = "fraiseql.mutation":
| Mutation | NATS Subject |
|---|---|
fn_create_order (INSERT) | fraiseql.mutation.Order.INSERT |
fn_update_order (UPDATE) | fraiseql.mutation.Order.UPDATE |
fn_delete_order (DELETE) | fraiseql.mutation.Order.DELETE |
fn_create_user (INSERT) | fraiseql.mutation.User.INSERT |
External consumers subscribe to these subjects directly using any NATS client library (Go, Node.js, Python nats.py, etc.).
Define your mutation in Python (schema declaration only — no runtime code):
import fraiseqlfrom fraiseql.scalars import ID, DateTime
@fraiseql.typeclass Order: id: ID identifier: str status: str total_amount: float created_at: DateTime
@fraiseql.inputclass CreateOrderInput: items: list[str] shipping_address: str
@fraiseql.mutation(sql_source="fn_create_order", operation="CREATE")def create_order(input: CreateOrderInput) -> Order: """Create an order. The Rust observer will publish to NATS after this succeeds.""" passThe PostgreSQL function does the work; the Rust observer publishes the NATS event after a successful commit:
CREATE OR REPLACE FUNCTION fn_create_order( p_items TEXT[], p_shipping_address TEXT)RETURNS mutation_response LANGUAGE plpgsql AS $$DECLARE v_id UUID; v_pk BIGINT; v_slug TEXT := gen_random_uuid()::text;BEGIN INSERT INTO tb_order (identifier, items, shipping_address, status) VALUES (v_slug, p_items, p_shipping_address, 'pending') RETURNING pk_order, id INTO v_pk, v_id;
RETURN ROW( 'success', NULL, v_id, 'Order', (SELECT data FROM v_order WHERE id = v_id), NULL, NULL, NULL )::mutation_response;END;$$;After fn_create_order returns status = 'success', the FraiseQL observer automatically publishes to fraiseql.mutation.Order.INSERT.
For cases where you need to publish directly from the database (e.g. background jobs or triggers that fire outside of a FraiseQL mutation), use pg_notify. The FraiseQL observer listens on PostgreSQL channels and bridges to NATS:
-- PostgreSQL trigger publishes via LISTEN/NOTIFYCREATE OR REPLACE FUNCTION notify_order_created()RETURNS TRIGGER AS $$BEGIN PERFORM pg_notify( 'fraiseql_events', json_build_object( 'entity_type', 'Order', 'operation', 'INSERT', 'entity_id', NEW.id::text, 'timestamp', NOW() )::text ); RETURN NEW;END;$$ LANGUAGE plpgsql;
CREATE TRIGGER tr_order_createdAFTER INSERT ON tb_orderFOR EACH ROWEXECUTE FUNCTION notify_order_created();The FraiseQL observer bridges pg_notify events to NATS JetStream, so external consumers still receive them on the standard subject.
JetStream provides at-least-once delivery by default. The observer worker acknowledges each message after it successfully executes its action. If the worker crashes before acknowledging, NATS re-delivers after ack_wait_secs.
Configure in fraiseql-observer.toml:
[transport.nats.jetstream]max_deliver = 3 # Retry delivery up to 3 timesack_wait_secs = 30 # Re-deliver if not acked within 30sYour consumer logic must be idempotent — a message may arrive more than once.
When max_deliver is exhausted, JetStream moves the message to the $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.* advisory subject. Monitor this subject to detect permanently failed events:
[transport.nats.jetstream]max_deliver = 3dedup_window_minutes = 5 # Prevent reprocessing the same message within 5 minSet dedup_window_minutes to prevent duplicate processing when a message is re-delivered within the deduplication window. FraiseQL includes a message ID derived from the mutation’s transaction ID in the NATS message headers, which JetStream uses for deduplication.
JetStream retains events according to max_age_days and max_bytes. External consumers can replay history by subscribing with a DeliverPolicy set via their NATS client library:
[transport.nats.jetstream]max_age_days = 7 # Events retained for 7 daysmax_bytes = 10_737_418_240 # Up to 10 GBTo replay from the start, configure your external consumer’s NATS subscription with DeliverAll (or the equivalent in your client library). This is a client-side configuration, not a FraiseQL configuration.
External services consume FraiseQL events using any NATS client library. Here is an example using the nats-py library — note this is your external service code, completely separate from the FraiseQL schema:
"""External consumer for FraiseQL NATS events.
This is NOT part of the FraiseQL Python SDK.It is a separate service that subscribes to FraiseQL NATS eventsusing the standard nats-py library."""import asyncioimport jsonimport nats # pip install nats-py — NOT fraiseql
NATS_URL = "nats://localhost:4222"STREAM = "fraiseql_events"SUBJECT = "fraiseql.mutation.Order.INSERT"
async def handle_order_created(msg): """Handle new order events from FraiseQL mutations.""" event = json.loads(msg.data.decode()) order_id = event["entity_id"] order_data = event.get("entity", {})
print(f"New order created: {order_id}") # ... send confirmation email, update analytics, etc.
await msg.ack() # Acknowledge after successful processing
async def main(): nc = await nats.connect(NATS_URL) js = nc.jetstream()
# Subscribe to FraiseQL Order INSERT events sub = await js.subscribe( SUBJECT, durable="email-service-order-handler", # Durable = survives restart stream=STREAM, )
async for msg in sub.messages: try: await handle_order_created(msg) except Exception as exc: print(f"Processing failed: {exc}") await msg.nak() # Re-deliver after ack_wait_secs
if __name__ == "__main__": asyncio.run(main())The FraiseQL observer exposes Prometheus metrics with the fraiseql_observer_ prefix:
| Metric | Description |
|---|---|
fraiseql_observer_events_processed_total | Total events processed successfully |
fraiseql_observer_events_failed_total | Total events that failed processing |
fraiseql_observer_events_retried_total | Total events redelivered by JetStream |
fraiseql_observer_processing_duration_seconds | Histogram of event processing latency |
Monitor consumer lag via the NATS monitoring endpoint (http://localhost:8222/jsz) or via the nats CLI:
# Show consumer status and pending message countnats consumer info fraiseql_events fraiseql_observer_worker
# Stream info (bytes, message count, retention)nats stream info fraiseql_eventsFederation + NATS Integration
Distributed Coordination — Combine federation with NATS
Observer Webhook Patterns
Webhooks & Observers — External notifications
NATS Docs
Official NATS Documentation — Complete NATS reference