Skip to content

NATS Event Pipeline Example

A complete event-driven order processing system using FraiseQL’s NATS integration with JetStream persistence, CDC (Change Data Capture), and multiple microservices.

GraphQL Client
|
FraiseQL API
|
+------+------+
| |
Orders DB NATS JetStream
| |
+------+------+
|
+-----+-----+
| | |
Inventory Payment Shipping
Service Service Service
| | |
Inv DB Pay DB Ship DB

In fraiseql.toml, enable NATS as the observer backend:

fraiseql.toml
[observers]
enabled = true
backend = "nats"
nats_url = "${NATS_URL}" # e.g. nats://nats-1:4222,nats://nats-2:4222
[[observers.handlers]]
name = "order-pipeline"
event = "order.created"
action = "webhook"
webhook_url = "${ORDER_PROCESSOR_URL}"

Advanced JetStream settings (stream retention, consumers, DLQ) are configured in the observer runtime config file (fraiseql-observer.toml):

fraiseql-observer.toml
[transport]
transport = "nats"
[transport.nats]
url = "${NATS_URL}"
stream_name = "fraiseql_events"
subject_prefix = "fraiseql.mutation"
consumer_name = "fraiseql_observer_worker"
[transport.nats.jetstream]
max_age_days = 30
max_bytes = 10_737_418_240 # 10 GB
max_msgs = 1_000_000
max_deliver = 3
ack_wait_secs = 30
dedup_window_minutes = 5
CREATE TABLE tb_order (
pk_order SERIAL PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE,
customer_id UUID NOT NULL,
total DECIMAL(10, 2) NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE tb_order_item (
pk_order_item SERIAL PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
fk_order INTEGER NOT NULL REFERENCES tb_order(pk_order),
product_id UUID NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(10, 2) NOT NULL
);
-- Views with JSONB data column
CREATE VIEW v_order AS
SELECT
o.id,
o.identifier,
jsonb_build_object(
'id', o.id::text,
'identifier', o.identifier,
'customer_id', o.customer_id::text,
'total', o.total,
'status', o.status,
'created_at', o.created_at,
'updated_at', o.updated_at
) AS data
FROM tb_order o;

FraiseQL Schema with CDC and Subscriptions

Section titled “FraiseQL Schema with CDC and Subscriptions”
import fraiseql
from fraiseql.scalars import ID
from fraiseql.nats import publish
from typing import Optional, Any
from decimal import Decimal
from datetime import datetime
# ==================== TYPES ====================
@fraiseql.type
class Order:
id: ID
customer_id: ID
total: Decimal
status: str # 'pending', 'confirmed', 'shipped', 'delivered', 'cancelled'
created_at: datetime
updated_at: datetime
@fraiseql.type
class OrderItem:
id: ID
order_id: ID
product_id: ID
quantity: int
unit_price: Decimal
@fraiseql.type
class Reservation:
id: ID
order_id: ID
product_id: ID
quantity: int
status: str
created_at: datetime
expires_at: Optional[datetime]
@fraiseql.type
class Shipping:
id: ID
order_id: ID
address: str
status: str
tracking_number: Optional[str]
estimated_delivery: Optional[datetime]
@fraiseql.type
class OrderEvent:
"""Base event structure for all order events."""
type: str
timestamp: datetime
order_id: ID
data: Any
# ==================== MUTATIONS ====================
@fraiseql.mutation(sql_source="fn_create_order", operation="CREATE")
async def create_order(
customer_id: ID,
items: list[dict],
shipping_address: dict
) -> Order:
"""Create order and publish event to NATS."""
pass
# Publish custom event after order creation
@fraiseql.after_mutation("create_order")
async def after_create_order(order: Order, context):
"""Publish order created event for other services."""
await publish(
subject="fraiseql.order.created",
data={
"type": "order.created",
"timestamp": datetime.utcnow().isoformat(),
"order_id": order.id,
"customer_id": order.customer_id,
"total": str(order.total),
"data": {
"customer_id": order.customer_id,
"items": context.variables.get("items"),
"shipping_address": context.variables.get("shipping_address")
}
}
)
@fraiseql.mutation(sql_source="fn_update_order_status", operation="UPDATE")
async def update_order_status(id: ID, status: str) -> Order:
"""Update order status and publish event."""
pass
@fraiseql.after_mutation("update_order_status")
async def after_update_order_status(order: Order):
"""Publish status changed event."""
await publish(
subject="fraiseql.order.status_changed",
data={
"type": "order.status_changed",
"timestamp": datetime.utcnow().isoformat(),
"order_id": order.id,
"new_status": order.status
}
)
# ==================== SUBSCRIPTIONS ====================
@fraiseql.subscription(
entity_type="Order",
topic="order_created",
jetstream=True,
replay=False # Only new orders
)
def order_created() -> Order:
"""Subscribe to newly created orders."""
pass
@fraiseql.subscription(
entity_type="Order",
topic="order_status_changed",
jetstream=True,
filter="new_status == 'shipped'"
)
def order_shipped() -> Order:
"""Subscribe only to shipped orders."""
pass
@fraiseql.subscription(
entity_type="Order",
topic="order_status_changed",
jetstream=True,
replay=True,
replay_from="2024-01-01T00:00:00Z"
)
def order_status_history() -> Order:
"""Replay all status changes from a point in time."""
pass
# ==================== QUERIES ====================
@fraiseql.query(sql_source="v_order")
def order(id: ID) -> Order:
"""Get order by ID."""
pass
@fraiseql.query(sql_source="v_order")
def orders_by_status(status: str, limit: int = 100) -> list[Order]:
"""Get orders by status."""
pass
from fraiseql.nats import subscribe, publish, request
import asyncio
@subscribe("fraiseql.order.created")
async def handle_order_created(event: dict):
"""
Orchestrate order processing:
1. Request inventory reservation
2. Request shipping quote
3. Publish order.confirmed when ready
"""
order_id = event["data"]["order_id"]
items = event["data"]["items"]
try:
# Step 1: Request inventory service to reserve items
reservation = await request(
subject="inventory.reserve",
data={"order_id": order_id, "items": items},
timeout=5000
)
if not reservation.get("success"):
await publish(
subject="fraiseql.order.failed",
data={
"type": "order.reservation_failed",
"timestamp": datetime.utcnow().isoformat(),
"order_id": order_id,
"reason": reservation.get("reason")
}
)
return
# Step 2: Request shipping service
shipping = await request(
subject="shipping.quote",
data={
"order_id": order_id,
"address": event["data"]["shipping_address"],
"items": items
},
timeout=5000
)
# Step 3: Publish confirmation
await publish(
subject="fraiseql.order.confirmed",
data={
"type": "order.confirmed",
"timestamp": datetime.utcnow().isoformat(),
"order_id": order_id,
"reservation_id": reservation.get("reservation_id"),
"shipping_id": shipping.get("shipping_id"),
"estimated_delivery": shipping.get("estimated_delivery")
}
)
except Exception as e:
await publish(
subject="fraiseql.order.error",
data={
"type": "order.error",
"timestamp": datetime.utcnow().isoformat(),
"order_id": order_id,
"error": str(e)
}
)
{
"type": "order.created",
"timestamp": "2024-01-15T10:30:00Z",
"order_id": "550e8400-e29b-41d4-a716-446655440000",
"customer_id": "client-123",
"total": "1234.56",
"data": {
"customer_id": "client-123",
"items": [
{
"product_id": "prod-001",
"quantity": 2,
"unit_price": "99.99",
"weight": 0.5
}
],
"shipping_address": {
"street": "123 Main St",
"city": "San Francisco",
"postal": "94102",
"country": "USA"
}
}
}
from fraiseql.nats import subscribe
import hashlib
@subscribe("fraiseql.order.created")
async def handle_order_created_idempotent(event: dict):
"""Process event idempotently using event ID."""
# Create deterministic event ID
event_id = hashlib.sha256(
f"{event['order_id']}-{event['timestamp']}".encode()
).hexdigest()
# Check if already processed
if await redis.exists(f"event_processed:{event_id}"):
return
try:
await process_order(event)
# Store with TTL of 30 days
await redis.setex(f"event_processed:{event_id}", 30 * 24 * 3600, "true")
except Exception:
raise # Let NATS retry (max_deliver = 3)
from fraiseql.nats import subscribe
@subscribe("fraiseql.dlq")
async def handle_dead_letter(event: dict):
"""Handle messages that failed 3 times."""
await log_dead_letter({
"timestamp": datetime.utcnow(),
"event": event,
"reason": "Max retries exceeded"
})
await notify_ops(f"Dead letter: {event.get('order_id')}")
if should_manual_retry(event):
await publish(
subject="fraiseql.order.manual_review",
data=event
)
Terminal window
# Check JetStream status
nats account info
# Monitor consumers
nats consumer list orders
# Check consumer state
nats consumer info orders order-processor
# View pending messages
nats consumer info orders order-processor --raw | jq '.state.pending'
# Check for slow consumers
nats consumer report orders order-processor
  1. Keep events small - Publish minimal data, clients query for details
  2. Use JetStream - Enable persistence for critical events
  3. Idempotent handlers - Design for at-least-once delivery
  4. Dead letter queues - Monitor and handle failed messages
  5. Partition by key - Maintain ordering within partitions
  6. ACK explicitly - Only ACK after successful processing
  7. Circuit breakers - Fail gracefully when services are down
  8. Replay capability - Design for time-travel debugging

NATS Integration

Complete NATS reference and configuration.

Federation

Synchronous cross-database access.

Subscriptions

GraphQL subscriptions over WebSocket.

Error Handling

Handle failures and retries gracefully.