Skip to content

Microservices Choreography with NATS

A complete microservices architecture using event-driven choreography with separate FraiseQL services and NATS for distributed order processing.

Microservices choreography: event-driven chain from order through inventory, payment, and shipping Microservices choreography: event-driven chain from order through inventory, payment, and shipping
Each service publishes a NATS event that triggers the next step — no central orchestrator.
sequenceDiagram
participant C as Client
participant OS as Order Service (FraiseQL)
participant N as NATS
participant INV as Inventory Worker (nats.py)
participant PAY as Payment Worker (nats.py)
participant SHP as Shipping Worker (nats.py)
C->>OS: createOrder(items, total)
OS->>OS: fn_create_order() → status "pending"
OS->>N: events.order.created (via pg_notify → observer)
OS-->>C: Order{status: "pending"}
N->>INV: events.order.created
INV->>INV: reserve stock in tb_inventory
INV->>N: events.inventory.reserved
N->>PAY: events.inventory.reserved
PAY->>PAY: charge card, insert tb_payment
PAY->>N: events.payment.charged
N->>OS: events.payment.charged
OS->>OS: fn_update_order_status() → "paid"
N->>SHP: events.payment.charged
SHP->>SHP: create label, insert tb_shipping
SHP->>N: events.order.shipped
N->>OS: events.order.shipped
OS->>OS: fn_update_order_status() → "shipped"
docker-compose.yml
version: '3.9'
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: orders
POSTGRES_USER: fraiseql
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
nats:
image: nats:latest
command: ["-js"]
ports:
- "4222:4222"
order-service:
build: ./order-service
depends_on: [postgres, nats]
environment:
DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders
NATS_URL: nats://nats:4222
inventory-worker:
build: ./inventory-worker
depends_on: [postgres, nats]
environment:
DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders
NATS_URL: nats://nats:4222
payment-worker:
build: ./payment-worker
depends_on: [postgres, nats]
environment:
DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders
NATS_URL: nats://nats:4222
shipping-worker:
build: ./shipping-worker
depends_on: [postgres, nats]
environment:
DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders
NATS_URL: nats://nats:4222

Start all services with:

Terminal window
docker-compose up -d
CREATE TABLE tb_order (
pk_order BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL,
identifier TEXT UNIQUE NOT NULL,
fk_user BIGINT NOT NULL REFERENCES tb_user(pk_user),
total NUMERIC(10, 2) NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX idx_tb_order_id ON tb_order(id);
CREATE INDEX idx_tb_order_fk_user ON tb_order(fk_user);
CREATE INDEX idx_tb_order_status ON tb_order(status);
CREATE TABLE tb_order_item (
pk_order_item BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL,
identifier TEXT UNIQUE NOT NULL,
fk_order BIGINT NOT NULL REFERENCES tb_order(pk_order),
fk_product BIGINT NOT NULL REFERENCES tb_product(pk_product),
quantity INTEGER NOT NULL,
unit_price NUMERIC(10, 2) NOT NULL
);
CREATE INDEX idx_tb_order_item_fk_order ON tb_order_item(fk_order);
CREATE TABLE tb_event_log (
pk_event_log BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL,
identifier TEXT UNIQUE NOT NULL,
event_type TEXT NOT NULL,
fk_order BIGINT REFERENCES tb_order(pk_order),
data JSONB NOT NULL DEFAULT '{}',
recorded_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_tb_event_log_fk_order ON tb_event_log(fk_order);
-- Read view: order status
CREATE VIEW v_order AS
SELECT
o.id,
jsonb_build_object(
'id', o.id::text,
'identifier', o.identifier,
'total', o.total,
'status', o.status,
'created_at', o.created_at
) AS data
FROM tb_order o;
-- Mutation function: create order
CREATE FUNCTION fn_create_order(
p_identifier TEXT,
p_fk_user BIGINT,
p_total NUMERIC
) RETURNS mutation_response
LANGUAGE plpgsql AS $$
DECLARE
v_id UUID;
v_result mutation_response;
BEGIN
INSERT INTO tb_order (identifier, fk_user, total, status)
VALUES (p_identifier, p_fk_user, p_total, 'pending')
RETURNING id INTO v_id;
v_result.status := 'success';
v_result.entity_id := v_id;
v_result.entity_type := 'Order';
RETURN v_result;
END;
$$;
-- Mutation function: update order status (called by NATS observer via SQL)
CREATE FUNCTION fn_update_order_status(
p_order_id UUID,
p_status TEXT
) RETURNS mutation_response
LANGUAGE plpgsql AS $$
DECLARE
v_result mutation_response;
BEGIN
UPDATE tb_order SET status = p_status WHERE id = p_order_id;
v_result.status := 'success';
v_result.entity_type := 'Order';
RETURN v_result;
END;
$$;
order-service/schema.py
import fraiseql
from fraiseql.scalars import ID, DateTime
from decimal import Decimal
@fraiseql.type
class Order:
"""A customer order."""
id: ID
identifier: str
total: Decimal
status: str
created_at: DateTime
@fraiseql.input
class CreateOrderInput:
identifier: str
total: Decimal
@fraiseql.query
def order(id: ID) -> Order | None:
"""Get a single order by UUID."""
return fraiseql.config(sql_source="v_order")
@fraiseql.query
def orders(limit: int = 20, offset: int = 0) -> list[Order]:
"""List orders. RLS ensures users see only their own."""
return fraiseql.config(sql_source="v_order")
@fraiseql.mutation(sql_source="fn_create_order", operation="CREATE")
def create_order(input: CreateOrderInput) -> Order:
"""Create a new order. Rust runtime calls fn_create_order(), pg_notify fires, observer publishes to NATS."""
pass
@fraiseql.subscription(entity_type="Order", topic="events.order.status_updated")
def order_status_updated(order_id: ID | None = None) -> Order:
"""Subscribe to order status changes in real time."""
pass
fraiseql.export_schema("schema.json")
order-service/fraiseql.toml
[project]
name = "order-service"
version = "1.0.0"
[database]
url = "${DATABASE_URL}"
[fraiseql]
schema_file = "schema.json"
output_file = "schema.compiled.json"
# JWT/OIDC config via environment variables (no [auth] TOML section):
# OIDC_ISSUER_URL, OIDC_CLIENT_ID, OIDC_CLIENT_SECRET
# NATS observer: when tb_order changes, publish to events.order.created
# The Rust runtime receives pg_notify from PostgreSQL and forwards to NATS.
[[observers]]
table = "tb_order"
event = "INSERT"
subject = "events.order.created"
[[observers]]
table = "tb_order"
event = "UPDATE"
subject = "events.order.status_updated"
[observers]
backend = "nats"
nats_url = "${NATS_URL}"
[security.enterprise]
enabled = true
log_level = "info"

Inventory Worker — External nats.py Service

Section titled “Inventory Worker — External nats.py Service”
inventory-worker/main.py
"""
Inventory worker: subscribes to NATS events using nats.py.
This is a standalone service, not a FraiseQL schema file.
"""
import asyncio
import json
import asyncpg
import nats
from nats.js import JetStreamContext
async def on_order_created(msg):
"""
React to order creation.
Reserve inventory or emit failure event.
"""
nc = msg._client # nats.py connection reference
js: JetStreamContext = nc.jetstream()
data = json.loads(msg.data)
order_id = data["order_id"]
items = data["items"]
conn = await asyncpg.connect(POSTGRES_URL)
try:
for item in items:
row = await conn.fetchrow(
"""
SELECT pk_inventory, quantity, reserved
FROM tb_inventory
WHERE fk_product = (
SELECT pk_product FROM tb_product WHERE identifier = $1
)
""",
item["sku"],
)
if row is None or (row["quantity"] - row["reserved"]) < item["quantity"]:
await js.publish(
"events.order.failed",
json.dumps({
"order_id": order_id,
"reason": "inventory_unavailable",
"sku": item["sku"],
}).encode(),
)
await msg.nak()
return
await conn.execute(
"UPDATE tb_inventory SET reserved = reserved + $1 WHERE pk_inventory = $2",
item["quantity"],
row["pk_inventory"],
)
await js.publish(
"events.inventory.reserved",
json.dumps({"order_id": order_id}).encode(),
)
await msg.ack()
except Exception as exc:
await js.publish(
"events.order.failed",
json.dumps({"order_id": order_id, "reason": str(exc)}).encode(),
)
await msg.nak(delay=5)
finally:
await conn.close()
async def main():
nc = await nats.connect(NATS_URL)
js = nc.jetstream()
await js.subscribe(
"events.order.created",
durable="inventory_processors",
cb=on_order_created,
)
print("Inventory worker listening on events.order.created")
await asyncio.sleep(float("inf"))
if __name__ == "__main__":
import os
NATS_URL = os.environ["NATS_URL"]
POSTGRES_URL = os.environ["DATABASE_URL"]
asyncio.run(main())

Payment Worker — External nats.py Service

Section titled “Payment Worker — External nats.py Service”
payment-worker/main.py
"""
Payment worker: subscribes to NATS events using nats.py.
This is a standalone service, not a FraiseQL schema file.
"""
import asyncio
import json
import asyncpg
import nats
async def on_inventory_reserved(msg):
"""
Process payment after inventory is reserved.
Risk: inventory reserved but payment fails — must compensate.
"""
nc = msg._client
js = nc.jetstream()
data = json.loads(msg.data)
order_id = data["order_id"]
conn = await asyncpg.connect(POSTGRES_URL)
try:
order = await conn.fetchrow(
"SELECT pk_order, total FROM tb_order WHERE id = $1::uuid",
order_id,
)
if order is None:
await msg.ack()
return
# Charge the card via external payment provider
payment = await charge_credit_card(
order_pk=order["pk_order"],
amount=order["total"],
)
await conn.execute(
"""
INSERT INTO tb_payment (identifier, fk_order, amount, status, provider_id)
VALUES (gen_random_uuid()::text, $1, $2, 'succeeded', $3)
""",
order["pk_order"],
order["total"],
payment.id,
)
await js.publish(
"events.payment.charged",
json.dumps({
"order_id": order_id,
"payment_id": payment.id,
"amount": float(order["total"]),
}).encode(),
)
await msg.ack()
except Exception as exc:
# Payment failed — emit compensation event so inventory releases
await js.publish(
"events.payment.failed",
json.dumps({
"order_id": order_id,
"reason": str(exc),
}).encode(),
)
await msg.nak(delay=10)
finally:
await conn.close()
async def on_payment_failed(msg):
"""Release reserved inventory when payment fails."""
data = json.loads(msg.data)
order_id = data["order_id"]
conn = await asyncpg.connect(POSTGRES_URL)
try:
items = await conn.fetch(
"""
SELECT oi.fk_product, oi.quantity
FROM tb_order_item oi
JOIN tb_order o ON o.pk_order = oi.fk_order
WHERE o.id = $1::uuid
""",
order_id,
)
for item in items:
await conn.execute(
"UPDATE tb_inventory SET reserved = reserved - $1 WHERE fk_product = $2",
item["quantity"],
item["fk_product"],
)
js = msg._client.jetstream()
await js.publish(
"events.order.failed",
json.dumps({
"order_id": order_id,
"reason": "payment_failed",
"compensated": True,
}).encode(),
)
await msg.ack()
finally:
await conn.close()
async def main():
nc = await nats.connect(NATS_URL)
js = nc.jetstream()
await js.subscribe(
"events.inventory.reserved",
durable="payment_processors",
cb=on_inventory_reserved,
)
await js.subscribe(
"events.payment.failed",
durable="compensation_handlers",
cb=on_payment_failed,
)
print("Payment worker listening")
await asyncio.sleep(float("inf"))
if __name__ == "__main__":
import os
NATS_URL = os.environ["NATS_URL"]
POSTGRES_URL = os.environ["DATABASE_URL"]
asyncio.run(main())

Shipping Worker — External nats.py Service

Section titled “Shipping Worker — External nats.py Service”
shipping-worker/main.py
"""
Shipping worker: subscribes to NATS events using nats.py.
This is a standalone service, not a FraiseQL schema file.
"""
import asyncio
import json
import asyncpg
import nats
async def on_payment_charged(msg):
"""
Create shipping label after payment succeeds.
Last step in the choreography.
"""
nc = msg._client
js = nc.jetstream()
data = json.loads(msg.data)
order_id = data["order_id"]
conn = await asyncpg.connect(POSTGRES_URL)
try:
order = await conn.fetchrow(
"SELECT pk_order, shipping_address FROM tb_order WHERE id = $1::uuid",
order_id,
)
if order is None:
await msg.ack()
return
label = await create_shipping_label(
order_pk=order["pk_order"],
address=order["shipping_address"],
)
await conn.execute(
"""
INSERT INTO tb_shipping (identifier, fk_order, provider_label_id, carrier, tracking_number)
VALUES (gen_random_uuid()::text, $1, $2, $3, $4)
""",
order["pk_order"],
label.id,
label.carrier,
label.tracking_number,
)
await js.publish(
"events.order.shipped",
json.dumps({
"order_id": order_id,
"tracking_number": label.tracking_number,
}).encode(),
)
await msg.ack()
except Exception as exc:
# Log for manual intervention — shipping is the last step
print(f"[shipping-worker] ERROR order={order_id}: {exc}")
await msg.nak(delay=60)
finally:
await conn.close()
async def main():
nc = await nats.connect(NATS_URL)
js = nc.jetstream()
await js.subscribe(
"events.payment.charged",
durable="shipping_handlers",
cb=on_payment_charged,
)
print("Shipping worker listening on events.payment.charged")
await asyncio.sleep(float("inf"))
if __name__ == "__main__":
import os
NATS_URL = os.environ["NATS_URL"]
POSTGRES_URL = os.environ["DATABASE_URL"]
asyncio.run(main())
monitoring-worker/main.py
"""
Event monitor: logs all events to tb_event_log.
Standalone nats.py service — not a FraiseQL schema file.
"""
import asyncio
import json
import asyncpg
import nats
async def monitor_events(msg):
"""Record all events in tb_event_log for audit trail."""
event_type = msg.subject
data = json.loads(msg.data)
order_id = data.get("order_id")
conn = await asyncpg.connect(POSTGRES_URL)
try:
# Look up pk_order for the FK
order_pk = None
if order_id:
row = await conn.fetchrow(
"SELECT pk_order FROM tb_order WHERE id = $1::uuid",
order_id,
)
if row:
order_pk = row["pk_order"]
await conn.execute(
"""
INSERT INTO tb_event_log (identifier, event_type, fk_order, data)
VALUES (gen_random_uuid()::text, $1, $2, $3::jsonb)
""",
event_type,
order_pk,
json.dumps(data),
)
if "failed" in event_type:
await send_alert({"severity": "warning", "event": event_type, "details": data})
await msg.ack()
finally:
await conn.close()
async def main():
nc = await nats.connect(NATS_URL)
js = nc.jetstream()
await js.subscribe("events.>", durable="event_monitoring", cb=monitor_events)
print("Monitoring worker listening on events.>")
await asyncio.sleep(float("inf"))
if __name__ == "__main__":
import os
NATS_URL = os.environ["NATS_URL"]
POSTGRES_URL = os.environ["DATABASE_URL"]
asyncio.run(main())
tests/test_choreography.py
import asyncio
import pytest
import httpx
@pytest.mark.asyncio
async def test_full_order_choreography():
"""Test complete happy path through all services via GraphQL API."""
async with httpx.AsyncClient(base_url="http://localhost:8080") as client:
# Create order via FraiseQL GraphQL API
response = await client.post(
"/graphql",
json={
"query": """
mutation {
createOrder(input: { identifier: "ord-test-1", total: "49.99" }) {
id
status
}
}
"""
},
headers={"Authorization": f"Bearer {TEST_TOKEN}"},
)
data = response.json()
order_id = data["data"]["createOrder"]["id"]
assert data["data"]["createOrder"]["status"] == "pending"
# Give all workers time to process the choreography
await asyncio.sleep(3.0)
async with httpx.AsyncClient(base_url="http://localhost:8080") as client:
response = await client.post(
"/graphql",
json={"query": f'query {{ order(id: "{order_id}") {{ id status }} }}'},
headers={"Authorization": f"Bearer {TEST_TOKEN}"},
)
final = response.json()
assert final["data"]["order"]["status"] == "shipped"
@pytest.mark.asyncio
async def test_payment_failure_compensation():
"""Test inventory release when payment fails."""
# Use a known-bad order identifier to trigger payment failure
async with httpx.AsyncClient(base_url="http://localhost:8080") as client:
response = await client.post(
"/graphql",
json={
"query": """
mutation {
createOrder(input: { identifier: "ord-bad-card-1", total: "49.99" }) {
id
}
}
"""
},
headers={"Authorization": f"Bearer {TEST_TOKEN}"},
)
order_id = response.json()["data"]["createOrder"]["id"]
await asyncio.sleep(3.0)
async with httpx.AsyncClient(base_url="http://localhost:8080") as client:
response = await client.post(
"/graphql",
json={"query": f'query {{ order(id: "{order_id}") {{ status }} }}'},
headers={"Authorization": f"Bearer {TEST_TOKEN}"},
)
assert response.json()["data"]["order"]["status"] == "failed"
  1. FraiseQL owns the write side: Mutations call SQL functions; the Rust runtime handles GraphQL execution.
  2. NATS events via observers: [observers] TOML config routes pg_notify events to NATS — no Python code required.
  3. External workers use nats.py: Downstream services are independent processes; they are not FraiseQL schema files.
  4. No transactions across services: Each service manages its own state; NATS delivers events asynchronously.
  5. Compensation pattern: Payment failure triggers an events.payment.failed event that the compensation handler processes to release inventory.
  6. Idempotency: Each worker handler is idempotent — safe to replay if NATS redelivers a message.
  7. Audit trail: The monitoring worker logs all events to tb_event_log for observability.
  1. Start all services with Docker Compose:

    Terminal window
    docker-compose up -d

    Verify services are running:

    Terminal window
    docker-compose ps

    Expected output:

    NAME STATUS PORTS
    microservices-choreography-postgres-1 Up (healthy) 0.0.0.0:5432->5432/tcp
    microservices-choreography-nats-1 Up (healthy) 0.0.0.0:4222->4222/tcp
    microservices-choreography-order-service-1 Up 0.0.0.0:8080->8080/tcp
    microservices-choreography-inventory-worker-1 Up
    microservices-choreography-payment-worker-1 Up
    microservices-choreography-shipping-worker-1 Up
  2. Check NATS is ready:

    Terminal window
    docker-compose exec nats nats server info

    Look for jetstream: enabled: true in output.

  3. Create a test order:

    Terminal window
    curl -X POST http://localhost:8080/graphql \
    -H "Content-Type: application/json" \
    -H "Authorization: Bearer $TOKEN" \
    -d '{
    "query": "mutation { createOrder(input: { identifier: \"ord-001\", total: \"49.99\" }) { id status } }"
    }'

    Expected response:

    {
    "data": {
    "createOrder": {
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "status": "pending"
    }
    }
    }
  4. Wait for choreography to complete (allow all workers time to process):

    Terminal window
    sleep 5
  5. Verify order reached final state:

    Terminal window
    curl -X POST http://localhost:8080/graphql \
    -H "Content-Type: application/json" \
    -H "Authorization: Bearer $TOKEN" \
    -d '{
    "query": "query { order(id: \"550e8400-e29b-41d4-a716-446655440000\") { id status } }"
    }'

    Expected response:

    {
    "data": {
    "order": {
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "status": "shipped"
    }
    }
    }
  6. Check event log for complete flow:

    Terminal window
    docker-compose exec postgres psql -U fraiseql -d orders -c \
    "SELECT event_type, recorded_at FROM tb_event_log ORDER BY recorded_at;"

    Expected output:

    event_type | recorded_at
    --------------------------------+----------------------------
    events.order.created | 2024-01-15 10:30:00
    events.inventory.reserved | 2024-01-15 10:30:01
    events.payment.charged | 2024-01-15 10:30:02
    events.order.shipped | 2024-01-15 10:30:05
  7. Test compensation (failure scenario):

    Terminal window
    curl -X POST http://localhost:8080/graphql \
    -H "Content-Type: application/json" \
    -H "Authorization: Bearer $TOKEN" \
    -d '{
    "query": "mutation { createOrder(input: { identifier: \"ord-fail-001\", total: \"9999.99\" }) { id } }"
    }'

    Wait and verify order failed:

    Terminal window
    sleep 3
    curl -X POST http://localhost:8080/graphql \
    -H "Content-Type: application/json" \
    -H "Authorization: Bearer $TOKEN" \
    -d '{"query": "query { order(id: \"<id-from-above>\") { status } }"}'

    Expected: {"status": "failed"}

  8. Run integration tests:

    Terminal window
    pytest tests/test_choreography.py -v

    Expected output:

    tests/test_choreography.py::test_full_order_choreography PASSED
    tests/test_choreography.py::test_payment_failure_compensation PASSED
Terminal window
docker-compose up -d
docker-compose logs order-service

Common issues:

  1. Port already in use: Change ports in docker-compose.yml
  2. Database not ready: Services have depends_on but PostgreSQL needs time to initialize
  3. NATS connection failed: Verify NATS is running: docker-compose logs nats

If order does not progress:

  1. Check NATS consumers:

    Terminal window
    docker-compose exec nats nats consumer report
  2. Verify workers are connected:

    Terminal window
    docker-compose exec nats nats consumer info events inventory_processors
  3. Check worker logs:

    Terminal window
    docker-compose logs inventory-worker | tail -20
    docker-compose logs payment-worker | tail -20
  4. Manual event inspection:

    Terminal window
    docker-compose exec nats nats sub events.order.created

If tb_event_log is empty:

  1. Check the FraiseQL observer configuration:

    Terminal window
    docker-compose logs order-service | grep "observer"
  2. Verify the [observers] section in fraiseql.toml is correct and nats_url is reachable.

  3. Check NATS stream exists:

    Terminal window
    docker-compose exec nats nats stream report

If payment failure does not release inventory:

  1. Check the payment worker compensation handler log:

    Terminal window
    docker-compose logs payment-worker | grep "compensation"
  2. Verify events.payment.failed subject is being published:

    Terminal window
    docker-compose exec nats nats sub events.payment.failed

If choreography is slow:

  1. Check consumer lag:

    Terminal window
    docker-compose exec nats nats consumer info events inventory_processors

    Look for num_pending (should be near 0)

  2. Scale workers horizontally: Run multiple instances of each worker with the same durable consumer name — NATS JetStream distributes messages across them.

  3. Monitor database connections:

    Terminal window
    docker-compose exec postgres psql -U fraiseql -c \
    "SELECT count(*) FROM pg_stat_activity;"
  • Monitor NATS consumer lag (how far behind consumers are)
  • Set up a Dead Letter Queue (DLQ) stream for permanently failed messages
  • Implement circuit breakers in workers for external service calls (payment provider, shipping API)
  • Use durable consumer groups for load distribution across worker replicas
  • Enable per-service metrics; instrument each worker with Prometheus counters on ack/nak
  • Tune FraiseQL [observers] batch size and flush interval for throughput

Advanced Federation

Cross-service queries and data composition.

Advanced NATS

Reliability patterns and JetStream.

Observers Reference

TOML observer configuration reference.