Skip to content

NATS Event Pipeline Example

A complete event-driven order processing system using FraiseQL’s NATS integration with JetStream persistence, CDC (Change Data Capture), and multiple microservices.

NATS event pipeline: order processing with JetStream fan-out to inventory, payment, and shipping services NATS event pipeline: order processing with JetStream fan-out to inventory, payment, and shipping services
FraiseQL publishes events to NATS JetStream; downstream services subscribe and process independently.

Separation of concerns:

  • FraiseQL service — declares GraphQL schema (@fraiseql.type, @fraiseql.query, @fraiseql.mutation). The SQL function calls pg_notify, which FraiseQL observers forward to NATS automatically based on fraiseql.toml configuration.
  • External microservices — written in plain Python using the nats.py library. These subscribe to NATS subjects and handle downstream processing. They are not FraiseQL services.

In fraiseql.toml, enable NATS as the observer backend:

fraiseql.toml
# Per-table observer entries (array of tables/events to watch)
[[observers]]
table = "tb_order"
event = "INSERT"
subject = "order_created"
[[observers]]
table = "tb_order"
event = "UPDATE"
subject = "order_status_changed"
# Global observer backend settings
[observers]
backend = "nats"
nats_url = "${NATS_URL}" # e.g. nats://nats-1:4222,nats://nats-2:4222

Advanced JetStream settings (stream retention, consumers, DLQ) are configured in the observer runtime config file (fraiseql-observer.toml):

fraiseql-observer.toml
[transport]
transport = "nats"
[transport.nats]
url = "${NATS_URL}"
stream_name = "fraiseql_events"
subject_prefix = "fraiseql.mutation"
consumer_name = "fraiseql_observer_worker"
[transport.nats.jetstream]
max_age_days = 30
max_bytes = 10_737_418_240 # 10 GB
max_msgs = 1_000_000
max_deliver = 3
ack_wait_secs = 30
dedup_window_minutes = 5
CREATE TABLE tb_order (
pk_order BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE,
customer_id UUID NOT NULL,
total DECIMAL(10, 2) NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE tb_order_item (
pk_order_item BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE,
fk_order BIGINT NOT NULL REFERENCES tb_order(pk_order),
product_id UUID NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(10, 2) NOT NULL
);
-- Views with JSONB data column
CREATE VIEW v_order AS
SELECT
o.id,
jsonb_build_object(
'id', o.id::text,
'identifier', o.identifier,
'customer_id', o.customer_id::text,
'total', o.total,
'status', o.status,
'created_at', o.created_at,
'updated_at', o.updated_at
) AS data
FROM tb_order o;

The FraiseQL service declares types, queries, mutations, and subscriptions. NATS publishing happens automatically: the SQL mutation function calls pg_notify, and FraiseQL observers forward those notifications to NATS subjects as configured in fraiseql.toml.

import fraiseql
from fraiseql.scalars import ID
from decimal import Decimal
from datetime import datetime
# ==================== TYPES ====================
@fraiseql.type
class Order:
id: ID
customer_id: ID
total: Decimal
status: str # 'pending', 'confirmed', 'shipped', 'delivered', 'cancelled'
created_at: datetime
updated_at: datetime
@fraiseql.type
class OrderItem:
id: ID
order_id: ID
product_id: ID
quantity: int
unit_price: Decimal
@fraiseql.type
class Reservation:
id: ID
order_id: ID
product_id: ID
quantity: int
status: str
created_at: datetime
expires_at: datetime | None
@fraiseql.type
class Shipping:
id: ID
order_id: ID
address: str
status: str
tracking_number: str | None
estimated_delivery: datetime | None
# ==================== MUTATIONS ====================
@fraiseql.input
class OrderItemInput:
product_id: ID
quantity: int
unit_price: Decimal
@fraiseql.mutation(sql_source="fn_create_order", operation="CREATE")
def create_order(
customer_id: ID,
items: list[OrderItemInput],
shipping_address: str,
) -> Order:
"""Create order. fn_create_order calls pg_notify; FraiseQL observers forward to NATS."""
pass
@fraiseql.mutation(sql_source="fn_update_order_status", operation="UPDATE")
def update_order_status(id: ID, status: str) -> Order:
"""Update order status. fn_update_order_status calls pg_notify for downstream services."""
pass
# ==================== SUBSCRIPTIONS ====================
@fraiseql.subscription(
entity_type="Order",
topic="order_created",
)
def order_created() -> Order:
"""Subscribe to newly created orders."""
pass
@fraiseql.subscription(
entity_type="Order",
topic="order_status_changed",
)
def order_shipped() -> Order:
"""Subscribe to order status changes."""
pass
# ==================== QUERIES ====================
@fraiseql.query
def order(id: ID) -> Order | None:
"""Get order by ID."""
return fraiseql.config(sql_source="v_order")
@fraiseql.query
def orders_by_status(status: str, limit: int = 100) -> list[Order]:
"""Get orders by status."""
return fraiseql.config(sql_source="v_order")

The mutation function uses pg_notify to trigger downstream processing. FraiseQL observers forward these notifications to NATS subjects configured in fraiseql.toml.

db/schema/03_functions/fn_create_order.sql
CREATE OR REPLACE FUNCTION fn_create_order(
p_customer_id UUID,
p_items JSONB,
p_shipping_address TEXT
)
RETURNS mutation_response
LANGUAGE plpgsql AS $$
DECLARE
v_id UUID;
v_result mutation_response;
BEGIN
INSERT INTO tb_order (customer_id, total, status, identifier)
VALUES (
p_customer_id,
0,
'pending',
'ORD-' || gen_random_uuid()::text
)
RETURNING id INTO v_id;
-- Notify downstream services via pg_notify.
-- FraiseQL observers forward this to the NATS subject configured in fraiseql.toml.
PERFORM pg_notify(
'order_created',
jsonb_build_object(
'order_id', v_id,
'customer_id', p_customer_id,
'items', p_items,
'shipping', p_shipping_address
)::text
);
v_result.status := 'success';
v_result.entity_id := v_id;
v_result.entity_type := 'Order';
RETURN v_result;
END;
$$;

External microservices subscribe to NATS subjects using the standard nats.py library. These are not FraiseQL services — they are standalone Python processes that react to events.

order_orchestration_service.py
# External service — uses nats.py, not fraiseql SDK
import asyncio
import json
from datetime import datetime, timezone
import nats
async def handle_order_created(msg):
"""
Orchestrate order processing:
1. Request inventory reservation
2. Request shipping quote
3. Publish order.confirmed when ready
"""
event = json.loads(msg.data.decode())
order_id = event["order_id"]
items = event["items"]
try:
# Step 1: Request inventory service to reserve items
response = await nc.request(
"inventory.reserve",
json.dumps({"order_id": order_id, "items": items}).encode(),
timeout=5,
)
reservation = json.loads(response.data.decode())
if not reservation.get("success"):
await js.publish(
"fraiseql.order.failed",
json.dumps({
"type": "order.reservation_failed",
"timestamp": datetime.now(timezone.utc).isoformat(),
"order_id": order_id,
"reason": reservation.get("reason"),
}).encode(),
)
await msg.ack()
return
# Step 2: Request shipping service
response = await nc.request(
"shipping.quote",
json.dumps({
"order_id": order_id,
"address": event["shipping"],
"items": items,
}).encode(),
timeout=5,
)
shipping = json.loads(response.data.decode())
# Step 3: Publish confirmation
await js.publish(
"fraiseql.order.confirmed",
json.dumps({
"type": "order.confirmed",
"timestamp": datetime.now(timezone.utc).isoformat(),
"order_id": order_id,
"reservation_id": reservation.get("reservation_id"),
"shipping_id": shipping.get("shipping_id"),
"estimated_delivery": shipping.get("estimated_delivery"),
}).encode(),
)
except Exception as e:
await js.publish(
"fraiseql.order.error",
json.dumps({
"type": "order.error",
"timestamp": datetime.now(timezone.utc).isoformat(),
"order_id": order_id,
"error": str(e),
}).encode(),
)
await msg.ack()
async def main():
global nc, js
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
await js.subscribe("fraiseql.order.created", cb=handle_order_created, durable="orchestrator")
print("Order orchestration service running…")
try:
await asyncio.Event().wait()
finally:
await nc.close()
if __name__ == "__main__":
asyncio.run(main())
{
"type": "order.created",
"timestamp": "2024-01-15T10:30:00Z",
"order_id": "550e8400-e29b-41d4-a716-446655440000",
"customer_id": "client-123",
"total": "1234.56",
"data": {
"customer_id": "client-123",
"items": [
{
"product_id": "prod-001",
"quantity": 2,
"unit_price": "99.99",
"weight": 0.5
}
],
"shipping_address": {
"street": "123 Main St",
"city": "San Francisco",
"postal": "94102",
"country": "USA"
}
}
}
# idempotent_handler.py — external service using nats.py
import asyncio
import hashlib
import json
import nats
import redis.asyncio as redis_client
async def handle_order_created_idempotent(msg):
"""Process event idempotently using a deterministic event ID."""
event = json.loads(msg.data.decode())
# Create deterministic event ID
event_id = hashlib.sha256(
f"{event['order_id']}-{event['timestamp']}".encode()
).hexdigest()
# Check if already processed
if await redis.exists(f"event_processed:{event_id}"):
await msg.ack()
return
try:
await process_order(event)
# Store with TTL of 30 days
await redis.setex(f"event_processed:{event_id}", 30 * 24 * 3600, "true")
await msg.ack()
except Exception:
raise # Let NATS retry (max_deliver = 3)
async def main():
global redis
redis = redis_client.from_url("redis://localhost:6379")
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
await js.subscribe(
"fraiseql.order.created",
cb=handle_order_created_idempotent,
durable="idempotent-processor",
)
try:
await asyncio.Event().wait()
finally:
await nc.close()
await redis.close()
if __name__ == "__main__":
asyncio.run(main())
# dlq_handler.py — external service using nats.py
import asyncio
import json
from datetime import datetime, timezone
import nats
async def handle_dead_letter(msg):
"""Handle messages that failed max_deliver times."""
event = json.loads(msg.data.decode())
await log_dead_letter({
"timestamp": datetime.now(timezone.utc).isoformat(),
"event": event,
"reason": "Max retries exceeded",
})
await notify_ops(f"Dead letter: {event.get('order_id')}")
if should_manual_retry(event):
js = nc.jetstream()
await js.publish(
"fraiseql.order.manual_review",
json.dumps(event).encode(),
)
await msg.ack()
async def main():
global nc
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
await js.subscribe("fraiseql.dlq", cb=handle_dead_letter, durable="dlq-handler")
try:
await asyncio.Event().wait()
finally:
await nc.close()
if __name__ == "__main__":
asyncio.run(main())
Terminal window
# Check JetStream status
nats account info
# Monitor consumers
nats consumer list orders
# Check consumer state
nats consumer info orders order-processor
# View pending messages
nats consumer info orders order-processor --raw | jq '.state.pending'
# Check for slow consumers
nats consumer report orders order-processor
  1. Keep events small - Publish minimal data, clients query for details
  2. Use JetStream - Enable persistence for critical events
  3. Idempotent handlers - Design for at-least-once delivery
  4. Dead letter queues - Monitor and handle failed messages
  5. Partition by key - Maintain ordering within partitions
  6. ACK explicitly - Only ACK after successful processing
  7. Circuit breakers - Fail gracefully when services are down
  8. Replay capability - Design for time-travel debugging

NATS Integration

Complete NATS reference and configuration.

Federation

Synchronous cross-database access.

Subscriptions

GraphQL subscriptions over WebSocket.

Error Handling

Handle failures and retries gracefully.