Advanced Federation
Cross-service queries and data composition.
A complete microservices architecture using event-driven choreography with federation and NATS for distributed order processing.
Order Service ──── events.order.created ────► Inventory Service │ events.inventory.reserved │ ▼ Payment Service │ events.payment.charged │ ▼ Shipping Service │ events.order.shippedsequenceDiagram participant C as Client participant OS as Order Service participant INV as Inventory Service participant PAY as Payment Service participant SHP as Shipping Service participant N as NATS
C->>OS: createOrder(items, total) OS->>N: events.order.created OS-->>C: Order{status: "pending"}
N->>INV: events.order.created INV->>N: events.inventory.reserved Note over INV: Reserve stock
N->>PAY: events.inventory.reserved PAY->>N: events.payment.charged Note over PAY: Charge card
N->>OS: events.payment.charged OS->>OS: status = "paid"
N->>SHP: events.payment.charged SHP->>N: events.order.shipped Note over SHP: Create label
N->>OS: events.order.shipped OS->>OS: status = "shipped"version: '3.9'services: postgres: image: postgres:16-alpine environment: POSTGRES_DB: orders POSTGRES_USER: fraiseql POSTGRES_PASSWORD: password ports: - "5432:5432"
nats: image: nats:latest command: ["-js"] ports: - "4222:4222"
order-service: build: ./order-service depends_on: [postgres, nats] environment: DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders NATS_URL: nats://nats:4222
inventory-service: build: ./inventory-service depends_on: [postgres, nats] environment: DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders NATS_URL: nats://nats:4222
payment-service: build: ./payment-service depends_on: [postgres, nats] environment: DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders NATS_URL: nats://nats:4222
shipping-service: build: ./shipping-service depends_on: [postgres, nats] environment: DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders NATS_URL: nats://nats:4222Start all services with:
docker-compose up -dimport fraiseqlfrom fraiseql.scalars import IDfrom datetime import datetimefrom decimal import Decimal
@fraiseql.typeclass Order: id: ID user_id: ID total: Decimal status: str created_at: datetime
@fraiseql.mutation(operation="CREATE")async def create_order( user_id: ID, items: list[dict], total: Decimal) -> Order: """ Create order and emit event. Does NOT block waiting for downstream services. Event-driven choreography handles the rest. """ order = await ctx.db.execute( """ INSERT INTO tb_order (user_id, total, status, created_at) VALUES ($1, $2, 'pending', NOW()) RETURNING id, user_id, total, status, created_at """, [user_id, total] )
# Publish event (fire and forget) await fraiseql.nats.publish( subject="events.order.created", data={ "order_id": str(order['id']), "user_id": str(user_id), "items": items, "total": float(total), "timestamp": datetime.now().isoformat() } )
return Order(**order)
# Track order status updates from other services@fraiseql.observer( entity="Order", event="CUSTOM:inventory.reserved")async def on_inventory_reserved(message, ctx): """Update order status when inventory is reserved.""" await ctx.db.execute( "UPDATE tb_order SET status = 'reserved' WHERE id = $1", [message.data["order_id"]] )
@fraiseql.observer( entity="Order", event="CUSTOM:payment.charged")async def on_payment_charged(message, ctx): """Update order status when payment is charged.""" await ctx.db.execute( "UPDATE tb_order SET status = 'paid' WHERE id = $1", [message.data["order_id"]] )
@fraiseql.observer( entity="Order", event="CUSTOM:order.shipped")async def on_order_shipped(message, ctx): """Update order status when shipped.""" await ctx.db.execute( "UPDATE tb_order SET status = 'shipped' WHERE id = $1", [message.data["order_id"]] )import fraiseql
# Listen for order events@fraiseql.nats.subscribe( subject="events.order.created", consumer_group="inventory_processors", max_concurrent=10)async def on_order_created(message): """ React to order creation. Reserve inventory or emit failure event. """ order_data = message.data order_id = order_data["order_id"] items = order_data["items"]
try: # Check and reserve inventory for item in items: available = await ctx.db.query_one( """ SELECT pk_inventory, quantity, reserved FROM tb_inventory WHERE fk_product = (SELECT pk_product FROM tb_product WHERE identifier = $1) """, [item["sku"]] )
if available["quantity"] - available["reserved"] < item["quantity"]: raise Exception(f"Insufficient inventory for {item['sku']}")
# Reserve (increase reserved count) await ctx.db.execute( """ UPDATE tb_inventory SET reserved = reserved + $1 WHERE pk_inventory = $2 """, [item["quantity"], available["pk_inventory"]] )
# Emit success event await fraiseql.nats.publish( subject="events.inventory.reserved", data={ "order_id": order_id, "reserved_at": datetime.now().isoformat() } )
await message.ack()
except Exception as e: # Emit failure event await fraiseql.nats.publish( subject="events.order.failed", data={ "order_id": order_id, "reason": "inventory_unavailable", "error": str(e) } )
# Don't ack - will be retried await message.nak(timeout=5000)@fraiseql.nats.subscribe( subject="events.inventory.reserved", consumer_group="payment_processors", max_concurrent=5 # Limit concurrency for payment processing)async def on_inventory_reserved(message): """ Process payment after inventory is reserved. Risk: inventory reserved but payment fails - must compensate. """ event_data = message.data order_id = event_data["order_id"]
try: # Get order details order = await ctx.db.query_one( "SELECT id, user_id, total FROM tb_order WHERE id = $1", [order_id] )
# Attempt payment payment = await charge_credit_card( customer_id=order["user_id"], amount=order["total"] )
# Record payment await ctx.db.execute( """ INSERT INTO tb_payment (order_id, amount, status, payment_id, created_at) VALUES ($1, $2, 'succeeded', $3, NOW()) """, [order_id, order["total"], payment.id] )
# Emit success await fraiseql.nats.publish( subject="events.payment.charged", data={ "order_id": order_id, "payment_id": payment.id, "amount": float(order["total"]) } )
await message.ack()
except Exception as e: # Payment failed - emit compensation event await fraiseql.nats.publish( subject="events.payment.failed", data={ "order_id": order_id, "reason": str(e), "action": "release_inventory" } )
await message.nak(timeout=10000)
# Handle payment failures with compensation@fraiseql.nats.subscribe( subject="events.payment.failed", consumer_group="compensation_handlers")async def on_payment_failed(message): """Release reserved inventory if payment fails.""" event_data = message.data order_id = event_data["order_id"]
# Get original order items items = await ctx.db.query( """ SELECT oi.product_id, oi.quantity FROM tb_order_item oi WHERE oi.fk_order = (SELECT pk_order FROM tb_order WHERE id = $1) """, [order_id] )
# Release inventory for item in items: await ctx.db.execute( """ UPDATE tb_inventory SET reserved = reserved - $1 WHERE fk_product = (SELECT pk_product FROM tb_product WHERE id = $2) """, [item["quantity"], item["product_id"]] )
# Emit order failed event await fraiseql.nats.publish( subject="events.order.failed", data={ "order_id": order_id, "reason": "payment_failed", "compensated": True } )
await message.ack()@fraiseql.nats.subscribe( subject="events.payment.charged", consumer_group="shipping_handlers")async def on_payment_charged(message): """ Create shipping label after payment succeeds. Last step in the choreography - no compensation needed. """ event_data = message.data order_id = event_data["order_id"]
try: # Get order details order = await ctx.db.query_one( "SELECT id, shipping_address FROM tb_order WHERE id = $1", [order_id] )
# Create shipping label label = await create_shipping_label( order_id=order_id, address=order["shipping_address"] )
# Store label await ctx.db.execute( """ INSERT INTO tb_shipping (order_id, label_id, carrier, tracking, created_at) VALUES ($1, $2, $3, $4, NOW()) """, [order_id, label.id, label.carrier, label.tracking_number] )
# Publish completion event await fraiseql.nats.publish( subject="events.order.shipped", data={ "order_id": order_id, "tracking_number": label.tracking_number } )
await message.ack()
except Exception as e: # Log for manual intervention await log_error({ "order_id": order_id, "step": "shipping", "error": str(e) }) # Don't ack - manual retry needed await message.nak(timeout=60000)# Track event propagation@fraiseql.nats.subscribe( subject="events.>", consumer_group="event_monitoring")async def monitor_events(message): """Monitor all events for debugging.""" event_type = message.subject data = message.data
await ctx.db.execute( """ INSERT INTO tb_event_log (event_type, order_id, data, recorded_at) VALUES ($1, $2, $3::jsonb, NOW()) """, [event_type, data.get("order_id"), json.dumps(data)] )
# Alert on failures if "failed" in event_type: await send_alert({ "severity": "warning", "event": event_type, "details": data })
await message.ack()@pytest.mark.asyncioasync def test_full_order_choreography(): """Test complete happy path through all services.""" # Create order order = await order_service.create_order( user_id="user-1", items=[{"sku": "WIDGET", "quantity": 2}], total=49.99 )
# Give services time to process await asyncio.sleep(1.0)
# Verify final state final_order = await get_order(order.id) assert final_order.status == "shipped"
# Verify all events were published events = await get_event_history(order.id) assert "order.created" in events assert "inventory.reserved" in events assert "payment.charged" in events assert "order.shipped" in events
@pytest.mark.asyncioasync def test_payment_failure_compensation(): """Test compensation when payment fails.""" order = await order_service.create_order( user_id="user-with-bad-card", items=[{"sku": "WIDGET", "quantity": 2}], total=49.99 )
# Payment will fail due to bad card await asyncio.sleep(1.0)
# Verify compensation occurred final_order = await get_order(order.id) assert final_order.status == "failed"
# Verify inventory was released inventory = await get_inventory("WIDGET") assert inventory.reserved == 0Start all services with Docker Compose:
docker-compose up -dVerify services are running:
docker-compose psExpected output:
NAME STATUS PORTSmicroservices-choreography-postgres-1 Up (healthy) 0.0.0.0:5432->5432/tcpmicroservices-choreography-nats-1 Up (healthy) 0.0.0.0:4222->4222/tcpmicroservices-choreography-order-service-1 Up 0.0.0.0:8080->8080/tcpmicroservices-choreography-inventory-service-1 Upmicroservices-choreography-payment-service-1 Upmicroservices-choreography-shipping-service-1 UpCheck NATS is ready:
# Verify NATS JetStream is enableddocker-compose exec nats nats server infoLook for jetstream: enabled: true in output.
Create a test order:
curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -d '{ "query": "mutation { createOrder(userId: \"user-123\", items: [{sku: \"WIDGET\", quantity: 2}], total: 49.99) { id status } }" }'Expected response:
{ "data": { "createOrder": { "id": "ord-abc123", "status": "pending" } }}Wait for choreography to complete (5-10 seconds for all services):
sleep 5Verify order reached final state:
curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -d '{ "query": "query { order(id: \"ord-abc123\") { id status trackingNumber } }" }'Expected response:
{ "data": { "order": { "id": "ord-abc123", "status": "shipped", "trackingNumber": "TRK-789XYZ" } }}Check event log for complete flow:
docker-compose exec postgres psql -U fraiseql -d orders -c \ "SELECT event_type, order_id, recorded_at FROM tb_event_log WHERE order_id = 'ord-abc123' ORDER BY recorded_at;"Expected output:
event_type | order_id | recorded_at------------------------+-------------+----------------------------events.order.created | ord-abc123 | 2024-01-15 10:30:00events.inventory.reserved | ord-abc123 | 2024-01-15 10:30:01events.payment.charged | ord-abc123 | 2024-01-15 10:30:02events.order.shipped | ord-abc123 | 2024-01-15 10:30:05Test compensation (failure scenario):
# Create order with insufficient inventorycurl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -d '{ "query": "mutation { createOrder(userId: \"user-123\", items: [{sku: \"OUTOFSTOCK\", quantity: 999}], total: 9999.99) { id } }" }'Wait and verify order failed:
sleep 2curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -d '{"query": "query { order(id: \"ord-failed\") { status } }"}'Expected: {"status": "failed"}
Run integration tests:
pytest tests/test_choreography.py -vExpected output:
tests/test_choreography.py::test_full_order_choreography PASSEDtests/test_choreography.py::test_payment_failure_compensation PASSEDtests/test_choreography.py::test_inventory_failure PASSEDdocker-compose up -ddocker-compose logs order-serviceCommon issues:
depends_on but PostgreSQL needs time to initializedocker-compose logs natsIf order doesn’t progress:
Check NATS consumers:
docker-compose exec nats nats consumer reportVerify consumers are connected:
docker-compose exec nats nats consumer info events inventory_processorsCheck service logs:
docker-compose logs inventory-service | tail -20docker-compose logs payment-service | tail -20Manual event inspection:
docker-compose exec nats nats sub events.order.createdIf tb_event_log is empty:
Verify NATS publishing is working:
# In Python consoleimport asynciofrom fraiseql import natsawait nats.publish("test.subject", {"test": "data"})Check NATS stream exists:
docker-compose exec nats nats stream reportIf payment failure doesn’t release inventory:
Check compensation handler is registered:
docker-compose logs payment-service | grep "compensation"Verify events.payment.failed subject:
docker-compose exec nats nats sub events.payment.failedIf choreography is slow:
Check consumer lag:
docker-compose exec nats nats consumer info events inventory_processorsLook for num_pending (should be near 0)
Increase concurrency:
@fraiseql.nats.subscribe( subject="events.order.created", consumer_group="inventory_processors", max_concurrent=20 # Increase from 10)Monitor database connections:
docker-compose exec postgres psql -U fraiseql -c \ "SELECT count(*) FROM pg_stat_activity;"If tests fail:
Ensure all services are healthy:
docker-compose psClear test data between runs:
docker-compose exec postgres psql -U fraiseql -d orders -c \ "TRUNCATE tb_order, tb_event_log CASCADE;"Increase sleep time in tests if services are slow:
await asyncio.sleep(2.0) # Instead of 1.0Advanced Federation
Cross-service queries and data composition.
Advanced NATS
Reliability patterns and JetStream.
Custom Resolvers
Service-specific business logic.