NATS Integration
Complete NATS reference and configuration.
A complete event-driven order processing system using FraiseQL’s NATS integration with JetStream persistence, CDC (Change Data Capture), and multiple microservices.
Separation of concerns:
@fraiseql.type, @fraiseql.query, @fraiseql.mutation). The SQL function calls pg_notify, which FraiseQL observers forward to NATS automatically based on fraiseql.toml configuration.nats.py library. These subscribe to NATS subjects and handle downstream processing. They are not FraiseQL services.In fraiseql.toml, enable NATS as the observer backend:
# Per-table observer entries (array of tables/events to watch)[[observers]]table = "tb_order"event = "INSERT"subject = "order_created"
[[observers]]table = "tb_order"event = "UPDATE"subject = "order_status_changed"
# Global observer backend settings[observers]backend = "nats"nats_url = "${NATS_URL}" # e.g. nats://nats-1:4222,nats://nats-2:4222Advanced JetStream settings (stream retention, consumers, DLQ) are configured in the observer runtime config file (fraiseql-observer.toml):
[transport]transport = "nats"
[transport.nats]url = "${NATS_URL}"stream_name = "fraiseql_events"subject_prefix = "fraiseql.mutation"consumer_name = "fraiseql_observer_worker"
[transport.nats.jetstream]max_age_days = 30max_bytes = 10_737_418_240 # 10 GBmax_msgs = 1_000_000max_deliver = 3ack_wait_secs = 30dedup_window_minutes = 5CREATE TABLE tb_order ( pk_order BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, customer_id UUID NOT NULL, total DECIMAL(10, 2) NOT NULL, status TEXT NOT NULL DEFAULT 'pending', created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now());
CREATE TABLE tb_order_item ( pk_order_item BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, fk_order BIGINT NOT NULL REFERENCES tb_order(pk_order), product_id UUID NOT NULL, quantity INT NOT NULL, unit_price DECIMAL(10, 2) NOT NULL);
-- Views with JSONB data columnCREATE VIEW v_order ASSELECT o.id, jsonb_build_object( 'id', o.id::text, 'identifier', o.identifier, 'customer_id', o.customer_id::text, 'total', o.total, 'status', o.status, 'created_at', o.created_at, 'updated_at', o.updated_at ) AS dataFROM tb_order o;The FraiseQL service declares types, queries, mutations, and subscriptions. NATS publishing happens automatically: the SQL mutation function calls pg_notify, and FraiseQL observers forward those notifications to NATS subjects as configured in fraiseql.toml.
import fraiseqlfrom fraiseql.scalars import IDfrom decimal import Decimalfrom datetime import datetime
# ==================== TYPES ====================
@fraiseql.typeclass Order: id: ID customer_id: ID total: Decimal status: str # 'pending', 'confirmed', 'shipped', 'delivered', 'cancelled' created_at: datetime updated_at: datetime
@fraiseql.typeclass OrderItem: id: ID order_id: ID product_id: ID quantity: int unit_price: Decimal
@fraiseql.typeclass Reservation: id: ID order_id: ID product_id: ID quantity: int status: str created_at: datetime expires_at: datetime | None
@fraiseql.typeclass Shipping: id: ID order_id: ID address: str status: str tracking_number: str | None estimated_delivery: datetime | None
# ==================== MUTATIONS ====================
@fraiseql.inputclass OrderItemInput: product_id: ID quantity: int unit_price: Decimal
@fraiseql.mutation(sql_source="fn_create_order", operation="CREATE")def create_order( customer_id: ID, items: list[OrderItemInput], shipping_address: str,) -> Order: """Create order. fn_create_order calls pg_notify; FraiseQL observers forward to NATS.""" pass
@fraiseql.mutation(sql_source="fn_update_order_status", operation="UPDATE")def update_order_status(id: ID, status: str) -> Order: """Update order status. fn_update_order_status calls pg_notify for downstream services.""" pass
# ==================== SUBSCRIPTIONS ====================
@fraiseql.subscription( entity_type="Order", topic="order_created",)def order_created() -> Order: """Subscribe to newly created orders.""" pass
@fraiseql.subscription( entity_type="Order", topic="order_status_changed",)def order_shipped() -> Order: """Subscribe to order status changes.""" pass
# ==================== QUERIES ====================
@fraiseql.querydef order(id: ID) -> Order | None: """Get order by ID.""" return fraiseql.config(sql_source="v_order")
@fraiseql.querydef orders_by_status(status: str, limit: int = 100) -> list[Order]: """Get orders by status.""" return fraiseql.config(sql_source="v_order")The mutation function uses pg_notify to trigger downstream processing. FraiseQL observers forward these notifications to NATS subjects configured in fraiseql.toml.
CREATE OR REPLACE FUNCTION fn_create_order( p_customer_id UUID, p_items JSONB, p_shipping_address TEXT)RETURNS mutation_responseLANGUAGE plpgsql AS $$DECLARE v_id UUID; v_result mutation_response;BEGIN INSERT INTO tb_order (customer_id, total, status, identifier) VALUES ( p_customer_id, 0, 'pending', 'ORD-' || gen_random_uuid()::text ) RETURNING id INTO v_id;
-- Notify downstream services via pg_notify. -- FraiseQL observers forward this to the NATS subject configured in fraiseql.toml. PERFORM pg_notify( 'order_created', jsonb_build_object( 'order_id', v_id, 'customer_id', p_customer_id, 'items', p_items, 'shipping', p_shipping_address )::text );
v_result.status := 'success'; v_result.entity_id := v_id; v_result.entity_type := 'Order'; RETURN v_result;END;$$;External microservices subscribe to NATS subjects using the standard nats.py library. These are not FraiseQL services — they are standalone Python processes that react to events.
# External service — uses nats.py, not fraiseql SDKimport asyncioimport jsonfrom datetime import datetime, timezone
import nats
async def handle_order_created(msg): """ Orchestrate order processing: 1. Request inventory reservation 2. Request shipping quote 3. Publish order.confirmed when ready """ event = json.loads(msg.data.decode()) order_id = event["order_id"] items = event["items"]
try: # Step 1: Request inventory service to reserve items response = await nc.request( "inventory.reserve", json.dumps({"order_id": order_id, "items": items}).encode(), timeout=5, ) reservation = json.loads(response.data.decode())
if not reservation.get("success"): await js.publish( "fraiseql.order.failed", json.dumps({ "type": "order.reservation_failed", "timestamp": datetime.now(timezone.utc).isoformat(), "order_id": order_id, "reason": reservation.get("reason"), }).encode(), ) await msg.ack() return
# Step 2: Request shipping service response = await nc.request( "shipping.quote", json.dumps({ "order_id": order_id, "address": event["shipping"], "items": items, }).encode(), timeout=5, ) shipping = json.loads(response.data.decode())
# Step 3: Publish confirmation await js.publish( "fraiseql.order.confirmed", json.dumps({ "type": "order.confirmed", "timestamp": datetime.now(timezone.utc).isoformat(), "order_id": order_id, "reservation_id": reservation.get("reservation_id"), "shipping_id": shipping.get("shipping_id"), "estimated_delivery": shipping.get("estimated_delivery"), }).encode(), )
except Exception as e: await js.publish( "fraiseql.order.error", json.dumps({ "type": "order.error", "timestamp": datetime.now(timezone.utc).isoformat(), "order_id": order_id, "error": str(e), }).encode(), )
await msg.ack()
async def main(): global nc, js nc = await nats.connect("nats://localhost:4222") js = nc.jetstream()
await js.subscribe("fraiseql.order.created", cb=handle_order_created, durable="orchestrator") print("Order orchestration service running…")
try: await asyncio.Event().wait() finally: await nc.close()
if __name__ == "__main__": asyncio.run(main())# External service — uses nats.py, not fraiseql SDKimport asyncioimport jsonfrom datetime import datetime, timezone
import asyncpgimport nats
async def handle_inventory_reserve(msg): """Handle inventory reservation request via NATS request/reply.""" data = json.loads(msg.data.decode()) order_id = data["order_id"] items = data["items"]
async with db_pool.acquire() as conn: try: reservations = []
async with conn.transaction(): for item in items: product_id = item["product_id"] quantity = item["quantity"]
row = await conn.fetchrow( """ SELECT pk_inventory, available FROM tb_inventory WHERE fk_product = ( SELECT pk_product FROM tb_product WHERE id = $1::uuid ) AND available >= $2 FOR UPDATE """, product_id, quantity, )
if not row: await msg.respond( json.dumps({ "success": False, "reason": f"Insufficient inventory for {product_id}", }).encode() ) return
await conn.execute( "UPDATE tb_inventory SET reserved = reserved + $1 WHERE pk_inventory = $2", quantity, row["pk_inventory"], ) reservations.append({"product_id": product_id, "quantity": quantity})
reservation_id = str(await conn.fetchval( """ INSERT INTO tb_reservation (order_id, status, identifier) VALUES ($1::uuid, 'reserved', 'RES-' || gen_random_uuid()) RETURNING id """, order_id, ))
await msg.respond( json.dumps({ "success": True, "reservation_id": reservation_id, "items": reservations, }).encode() )
except Exception as e: await msg.respond( json.dumps({"success": False, "reason": str(e)}).encode() )
async def main(): global db_pool, nc db_pool = await asyncpg.create_pool("postgresql://localhost/inventory_db") nc = await nats.connect("nats://localhost:4222")
await nc.subscribe("inventory.reserve", cb=handle_inventory_reserve) print("Inventory service running…")
try: await asyncio.Event().wait() finally: await nc.close() await db_pool.close()
if __name__ == "__main__": asyncio.run(main())# External service — uses nats.py, not fraiseql SDKimport asyncioimport json
import nats
async def process_order_event(msg): """ Process all order events for analytics. Queue group ensures exactly-once processing across service replicas. """ event = json.loads(msg.data.decode()) event_type = event.get("type") order_id = event.get("order_id")
if event_type == "order.created": await increment_metric("orders.created", 1) await increment_metric("orders.total_revenue", float(event.get("total", 0)))
elif event_type == "order.confirmed": await increment_metric("orders.confirmed", 1)
elif event_type == "order.error": error = event.get("error", "unknown") await increment_metric(f"orders.errors.{error}", 1) await increment_metric("orders.failed", 1)
await msg.ack()
async def main(): nc = await nats.connect("nats://localhost:4222") js = nc.jetstream()
# Queue group ensures exactly-once delivery across replicas await js.subscribe( "fraiseql.order.>", cb=process_order_event, durable="analytics", queue="analytics", ) print("Analytics service running…")
try: await asyncio.Event().wait() finally: await nc.close()
if __name__ == "__main__": asyncio.run(main()){ "type": "order.created", "timestamp": "2024-01-15T10:30:00Z", "order_id": "550e8400-e29b-41d4-a716-446655440000", "customer_id": "client-123", "total": "1234.56", "data": { "customer_id": "client-123", "items": [ { "product_id": "prod-001", "quantity": 2, "unit_price": "99.99", "weight": 0.5 } ], "shipping_address": { "street": "123 Main St", "city": "San Francisco", "postal": "94102", "country": "USA" } }}{ "type": "order.status_changed", "timestamp": "2024-01-15T10:31:15Z", "order_id": "550e8400-e29b-41d4-a716-446655440000", "new_status": "confirmed", "previous_status": "pending", "data": { "reservation_id": "res-456", "shipping_id": "ship-789" }}# idempotent_handler.py — external service using nats.pyimport asyncioimport hashlibimport json
import natsimport redis.asyncio as redis_client
async def handle_order_created_idempotent(msg): """Process event idempotently using a deterministic event ID.""" event = json.loads(msg.data.decode())
# Create deterministic event ID event_id = hashlib.sha256( f"{event['order_id']}-{event['timestamp']}".encode() ).hexdigest()
# Check if already processed if await redis.exists(f"event_processed:{event_id}"): await msg.ack() return
try: await process_order(event) # Store with TTL of 30 days await redis.setex(f"event_processed:{event_id}", 30 * 24 * 3600, "true") await msg.ack()
except Exception: raise # Let NATS retry (max_deliver = 3)
async def main(): global redis redis = redis_client.from_url("redis://localhost:6379") nc = await nats.connect("nats://localhost:4222") js = nc.jetstream()
await js.subscribe( "fraiseql.order.created", cb=handle_order_created_idempotent, durable="idempotent-processor", )
try: await asyncio.Event().wait() finally: await nc.close() await redis.close()
if __name__ == "__main__": asyncio.run(main())# dlq_handler.py — external service using nats.pyimport asyncioimport jsonfrom datetime import datetime, timezone
import nats
async def handle_dead_letter(msg): """Handle messages that failed max_deliver times.""" event = json.loads(msg.data.decode())
await log_dead_letter({ "timestamp": datetime.now(timezone.utc).isoformat(), "event": event, "reason": "Max retries exceeded", })
await notify_ops(f"Dead letter: {event.get('order_id')}")
if should_manual_retry(event): js = nc.jetstream() await js.publish( "fraiseql.order.manual_review", json.dumps(event).encode(), )
await msg.ack()
async def main(): global nc nc = await nats.connect("nats://localhost:4222") js = nc.jetstream()
await js.subscribe("fraiseql.dlq", cb=handle_dead_letter, durable="dlq-handler")
try: await asyncio.Event().wait() finally: await nc.close()
if __name__ == "__main__": asyncio.run(main())# Check JetStream statusnats account info
# Monitor consumersnats consumer list orders
# Check consumer statenats consumer info orders order-processor
# View pending messagesnats consumer info orders order-processor --raw | jq '.state.pending'
# Check for slow consumersnats consumer report orders order-processorNATS Integration
Complete NATS reference and configuration.
Federation
Synchronous cross-database access.
Subscriptions
GraphQL subscriptions over WebSocket.
Error Handling
Handle failures and retries gracefully.