Skip to content

Microservices Choreography with NATS

A complete microservices architecture using event-driven choreography with federation and NATS for distributed order processing.

Order Service ──── events.order.created ────► Inventory Service
events.inventory.reserved
Payment Service
events.payment.charged
Shipping Service
events.order.shipped
sequenceDiagram
participant C as Client
participant OS as Order Service
participant INV as Inventory Service
participant PAY as Payment Service
participant SHP as Shipping Service
participant N as NATS
C->>OS: createOrder(items, total)
OS->>N: events.order.created
OS-->>C: Order{status: "pending"}
N->>INV: events.order.created
INV->>N: events.inventory.reserved
Note over INV: Reserve stock
N->>PAY: events.inventory.reserved
PAY->>N: events.payment.charged
Note over PAY: Charge card
N->>OS: events.payment.charged
OS->>OS: status = "paid"
N->>SHP: events.payment.charged
SHP->>N: events.order.shipped
Note over SHP: Create label
N->>OS: events.order.shipped
OS->>OS: status = "shipped"
docker-compose.yml
version: '3.9'
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: orders
POSTGRES_USER: fraiseql
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
nats:
image: nats:latest
command: ["-js"]
ports:
- "4222:4222"
order-service:
build: ./order-service
depends_on: [postgres, nats]
environment:
DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders
NATS_URL: nats://nats:4222
inventory-service:
build: ./inventory-service
depends_on: [postgres, nats]
environment:
DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders
NATS_URL: nats://nats:4222
payment-service:
build: ./payment-service
depends_on: [postgres, nats]
environment:
DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders
NATS_URL: nats://nats:4222
shipping-service:
build: ./shipping-service
depends_on: [postgres, nats]
environment:
DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders
NATS_URL: nats://nats:4222

Start all services with:

Terminal window
docker-compose up -d
import fraiseql
from fraiseql.scalars import ID
from datetime import datetime
from decimal import Decimal
@fraiseql.type
class Order:
id: ID
user_id: ID
total: Decimal
status: str
created_at: datetime
@fraiseql.mutation(operation="CREATE")
async def create_order(
user_id: ID,
items: list[dict],
total: Decimal
) -> Order:
"""
Create order and emit event.
Does NOT block waiting for downstream services.
Event-driven choreography handles the rest.
"""
order = await ctx.db.execute(
"""
INSERT INTO tb_order (user_id, total, status, created_at)
VALUES ($1, $2, 'pending', NOW())
RETURNING id, user_id, total, status, created_at
""",
[user_id, total]
)
# Publish event (fire and forget)
await fraiseql.nats.publish(
subject="events.order.created",
data={
"order_id": str(order['id']),
"user_id": str(user_id),
"items": items,
"total": float(total),
"timestamp": datetime.now().isoformat()
}
)
return Order(**order)
# Track order status updates from other services
@fraiseql.observer(
entity="Order",
event="CUSTOM:inventory.reserved"
)
async def on_inventory_reserved(message, ctx):
"""Update order status when inventory is reserved."""
await ctx.db.execute(
"UPDATE tb_order SET status = 'reserved' WHERE id = $1",
[message.data["order_id"]]
)
@fraiseql.observer(
entity="Order",
event="CUSTOM:payment.charged"
)
async def on_payment_charged(message, ctx):
"""Update order status when payment is charged."""
await ctx.db.execute(
"UPDATE tb_order SET status = 'paid' WHERE id = $1",
[message.data["order_id"]]
)
@fraiseql.observer(
entity="Order",
event="CUSTOM:order.shipped"
)
async def on_order_shipped(message, ctx):
"""Update order status when shipped."""
await ctx.db.execute(
"UPDATE tb_order SET status = 'shipped' WHERE id = $1",
[message.data["order_id"]]
)
import fraiseql
# Listen for order events
@fraiseql.nats.subscribe(
subject="events.order.created",
consumer_group="inventory_processors",
max_concurrent=10
)
async def on_order_created(message):
"""
React to order creation.
Reserve inventory or emit failure event.
"""
order_data = message.data
order_id = order_data["order_id"]
items = order_data["items"]
try:
# Check and reserve inventory
for item in items:
available = await ctx.db.query_one(
"""
SELECT pk_inventory, quantity, reserved
FROM tb_inventory
WHERE fk_product = (SELECT pk_product FROM tb_product WHERE identifier = $1)
""",
[item["sku"]]
)
if available["quantity"] - available["reserved"] < item["quantity"]:
raise Exception(f"Insufficient inventory for {item['sku']}")
# Reserve (increase reserved count)
await ctx.db.execute(
"""
UPDATE tb_inventory
SET reserved = reserved + $1
WHERE pk_inventory = $2
""",
[item["quantity"], available["pk_inventory"]]
)
# Emit success event
await fraiseql.nats.publish(
subject="events.inventory.reserved",
data={
"order_id": order_id,
"reserved_at": datetime.now().isoformat()
}
)
await message.ack()
except Exception as e:
# Emit failure event
await fraiseql.nats.publish(
subject="events.order.failed",
data={
"order_id": order_id,
"reason": "inventory_unavailable",
"error": str(e)
}
)
# Don't ack - will be retried
await message.nak(timeout=5000)
@fraiseql.nats.subscribe(
subject="events.inventory.reserved",
consumer_group="payment_processors",
max_concurrent=5 # Limit concurrency for payment processing
)
async def on_inventory_reserved(message):
"""
Process payment after inventory is reserved.
Risk: inventory reserved but payment fails - must compensate.
"""
event_data = message.data
order_id = event_data["order_id"]
try:
# Get order details
order = await ctx.db.query_one(
"SELECT id, user_id, total FROM tb_order WHERE id = $1",
[order_id]
)
# Attempt payment
payment = await charge_credit_card(
customer_id=order["user_id"],
amount=order["total"]
)
# Record payment
await ctx.db.execute(
"""
INSERT INTO tb_payment (order_id, amount, status, payment_id, created_at)
VALUES ($1, $2, 'succeeded', $3, NOW())
""",
[order_id, order["total"], payment.id]
)
# Emit success
await fraiseql.nats.publish(
subject="events.payment.charged",
data={
"order_id": order_id,
"payment_id": payment.id,
"amount": float(order["total"])
}
)
await message.ack()
except Exception as e:
# Payment failed - emit compensation event
await fraiseql.nats.publish(
subject="events.payment.failed",
data={
"order_id": order_id,
"reason": str(e),
"action": "release_inventory"
}
)
await message.nak(timeout=10000)
# Handle payment failures with compensation
@fraiseql.nats.subscribe(
subject="events.payment.failed",
consumer_group="compensation_handlers"
)
async def on_payment_failed(message):
"""Release reserved inventory if payment fails."""
event_data = message.data
order_id = event_data["order_id"]
# Get original order items
items = await ctx.db.query(
"""
SELECT oi.product_id, oi.quantity
FROM tb_order_item oi
WHERE oi.fk_order = (SELECT pk_order FROM tb_order WHERE id = $1)
""",
[order_id]
)
# Release inventory
for item in items:
await ctx.db.execute(
"""
UPDATE tb_inventory
SET reserved = reserved - $1
WHERE fk_product = (SELECT pk_product FROM tb_product WHERE id = $2)
""",
[item["quantity"], item["product_id"]]
)
# Emit order failed event
await fraiseql.nats.publish(
subject="events.order.failed",
data={
"order_id": order_id,
"reason": "payment_failed",
"compensated": True
}
)
await message.ack()
@fraiseql.nats.subscribe(
subject="events.payment.charged",
consumer_group="shipping_handlers"
)
async def on_payment_charged(message):
"""
Create shipping label after payment succeeds.
Last step in the choreography - no compensation needed.
"""
event_data = message.data
order_id = event_data["order_id"]
try:
# Get order details
order = await ctx.db.query_one(
"SELECT id, shipping_address FROM tb_order WHERE id = $1",
[order_id]
)
# Create shipping label
label = await create_shipping_label(
order_id=order_id,
address=order["shipping_address"]
)
# Store label
await ctx.db.execute(
"""
INSERT INTO tb_shipping (order_id, label_id, carrier, tracking, created_at)
VALUES ($1, $2, $3, $4, NOW())
""",
[order_id, label.id, label.carrier, label.tracking_number]
)
# Publish completion event
await fraiseql.nats.publish(
subject="events.order.shipped",
data={
"order_id": order_id,
"tracking_number": label.tracking_number
}
)
await message.ack()
except Exception as e:
# Log for manual intervention
await log_error({
"order_id": order_id,
"step": "shipping",
"error": str(e)
})
# Don't ack - manual retry needed
await message.nak(timeout=60000)
# Track event propagation
@fraiseql.nats.subscribe(
subject="events.>",
consumer_group="event_monitoring"
)
async def monitor_events(message):
"""Monitor all events for debugging."""
event_type = message.subject
data = message.data
await ctx.db.execute(
"""
INSERT INTO tb_event_log (event_type, order_id, data, recorded_at)
VALUES ($1, $2, $3::jsonb, NOW())
""",
[event_type, data.get("order_id"), json.dumps(data)]
)
# Alert on failures
if "failed" in event_type:
await send_alert({
"severity": "warning",
"event": event_type,
"details": data
})
await message.ack()
@pytest.mark.asyncio
async def test_full_order_choreography():
"""Test complete happy path through all services."""
# Create order
order = await order_service.create_order(
user_id="user-1",
items=[{"sku": "WIDGET", "quantity": 2}],
total=49.99
)
# Give services time to process
await asyncio.sleep(1.0)
# Verify final state
final_order = await get_order(order.id)
assert final_order.status == "shipped"
# Verify all events were published
events = await get_event_history(order.id)
assert "order.created" in events
assert "inventory.reserved" in events
assert "payment.charged" in events
assert "order.shipped" in events
@pytest.mark.asyncio
async def test_payment_failure_compensation():
"""Test compensation when payment fails."""
order = await order_service.create_order(
user_id="user-with-bad-card",
items=[{"sku": "WIDGET", "quantity": 2}],
total=49.99
)
# Payment will fail due to bad card
await asyncio.sleep(1.0)
# Verify compensation occurred
final_order = await get_order(order.id)
assert final_order.status == "failed"
# Verify inventory was released
inventory = await get_inventory("WIDGET")
assert inventory.reserved == 0
  1. Fire and Forget: Order service doesn’t wait for other services
  2. Event-Driven: Services communicate only through events
  3. No Transactions: Each service manages its own state
  4. Compensation: Services can undo work if downstream fails
  5. Idempotency: Each event handler is idempotent (safe to replay)
  6. Visibility: Event log provides complete audit trail
  1. Start all services with Docker Compose:

    Terminal window
    docker-compose up -d

    Verify services are running:

    Terminal window
    docker-compose ps

    Expected output:

    NAME STATUS PORTS
    microservices-choreography-postgres-1 Up (healthy) 0.0.0.0:5432->5432/tcp
    microservices-choreography-nats-1 Up (healthy) 0.0.0.0:4222->4222/tcp
    microservices-choreography-order-service-1 Up 0.0.0.0:8080->8080/tcp
    microservices-choreography-inventory-service-1 Up
    microservices-choreography-payment-service-1 Up
    microservices-choreography-shipping-service-1 Up
  2. Check NATS is ready:

    Terminal window
    # Verify NATS JetStream is enabled
    docker-compose exec nats nats server info

    Look for jetstream: enabled: true in output.

  3. Create a test order:

    Terminal window
    curl -X POST http://localhost:8080/graphql \
    -H "Content-Type: application/json" \
    -d '{
    "query": "mutation { createOrder(userId: \"user-123\", items: [{sku: \"WIDGET\", quantity: 2}], total: 49.99) { id status } }"
    }'

    Expected response:

    {
    "data": {
    "createOrder": {
    "id": "ord-abc123",
    "status": "pending"
    }
    }
    }
  4. Wait for choreography to complete (5-10 seconds for all services):

    Terminal window
    sleep 5
  5. Verify order reached final state:

    Terminal window
    curl -X POST http://localhost:8080/graphql \
    -H "Content-Type: application/json" \
    -d '{
    "query": "query { order(id: \"ord-abc123\") { id status trackingNumber } }"
    }'

    Expected response:

    {
    "data": {
    "order": {
    "id": "ord-abc123",
    "status": "shipped",
    "trackingNumber": "TRK-789XYZ"
    }
    }
    }
  6. Check event log for complete flow:

    Terminal window
    docker-compose exec postgres psql -U fraiseql -d orders -c \
    "SELECT event_type, order_id, recorded_at FROM tb_event_log WHERE order_id = 'ord-abc123' ORDER BY recorded_at;"

    Expected output:

    event_type | order_id | recorded_at
    ------------------------+-------------+----------------------------
    events.order.created | ord-abc123 | 2024-01-15 10:30:00
    events.inventory.reserved | ord-abc123 | 2024-01-15 10:30:01
    events.payment.charged | ord-abc123 | 2024-01-15 10:30:02
    events.order.shipped | ord-abc123 | 2024-01-15 10:30:05
  7. Test compensation (failure scenario):

    Terminal window
    # Create order with insufficient inventory
    curl -X POST http://localhost:8080/graphql \
    -H "Content-Type: application/json" \
    -d '{
    "query": "mutation { createOrder(userId: \"user-123\", items: [{sku: \"OUTOFSTOCK\", quantity: 999}], total: 9999.99) { id } }"
    }'

    Wait and verify order failed:

    Terminal window
    sleep 2
    curl -X POST http://localhost:8080/graphql \
    -H "Content-Type: application/json" \
    -d '{"query": "query { order(id: \"ord-failed\") { status } }"}'

    Expected: {"status": "failed"}

  8. Run integration tests:

    Terminal window
    pytest tests/test_choreography.py -v

    Expected output:

    tests/test_choreography.py::test_full_order_choreography PASSED
    tests/test_choreography.py::test_payment_failure_compensation PASSED
    tests/test_choreography.py::test_inventory_failure PASSED
Terminal window
docker-compose up -d
docker-compose logs order-service

Common issues:

  1. Port already in use: Change ports in docker-compose.yml
  2. Database not ready: Services have depends_on but PostgreSQL needs time to initialize
  3. NATS connection failed: Verify NATS is running: docker-compose logs nats

If order doesn’t progress:

  1. Check NATS consumers:

    Terminal window
    docker-compose exec nats nats consumer report
  2. Verify consumers are connected:

    Terminal window
    docker-compose exec nats nats consumer info events inventory_processors
  3. Check service logs:

    Terminal window
    docker-compose logs inventory-service | tail -20
    docker-compose logs payment-service | tail -20
  4. Manual event inspection:

    Terminal window
    docker-compose exec nats nats sub events.order.created

If tb_event_log is empty:

  1. Verify NATS publishing is working:

    # In Python console
    import asyncio
    from fraiseql import nats
    await nats.publish("test.subject", {"test": "data"})
  2. Check NATS stream exists:

    Terminal window
    docker-compose exec nats nats stream report

If payment failure doesn’t release inventory:

  1. Check compensation handler is registered:

    Terminal window
    docker-compose logs payment-service | grep "compensation"
  2. Verify events.payment.failed subject:

    Terminal window
    docker-compose exec nats nats sub events.payment.failed

If choreography is slow:

  1. Check consumer lag:

    Terminal window
    docker-compose exec nats nats consumer info events inventory_processors

    Look for num_pending (should be near 0)

  2. Increase concurrency:

    @fraiseql.nats.subscribe(
    subject="events.order.created",
    consumer_group="inventory_processors",
    max_concurrent=20 # Increase from 10
    )
  3. Monitor database connections:

    Terminal window
    docker-compose exec postgres psql -U fraiseql -c \
    "SELECT count(*) FROM pg_stat_activity;"

If tests fail:

  1. Ensure all services are healthy:

    Terminal window
    docker-compose ps
  2. Clear test data between runs:

    Terminal window
    docker-compose exec postgres psql -U fraiseql -d orders -c \
    "TRUNCATE tb_order, tb_event_log CASCADE;"
  3. Increase sleep time in tests if services are slow:

    await asyncio.sleep(2.0) # Instead of 1.0
  • Monitor event lag (how far behind consumers are)
  • Set up DLQ for permanently failed messages
  • Implement circuit breakers for external service calls
  • Use consumer groups for load distribution
  • Enable metrics for each service and event type

Advanced Federation

Cross-service queries and data composition.

Advanced NATS

Reliability patterns and JetStream.

Custom Resolvers

Service-specific business logic.