Skip to content

Federation & NATS Troubleshooting

This guide covers solutions for common issues when using FraiseQL’s federation and NATS capabilities.

Symptoms: DatabaseConnectionError: Failed to connect to database 'inventory'

Causes:

  • Network connectivity issue
  • Database server is down
  • Wrong credentials in config
  • Firewall blocking connection

Solutions:

Terminal window
# 1. Test database connectivity
fraiseql health --database inventory
# 2. Check configuration
cat fraiseql.toml | grep -A 5 "\[databases.inventory\]"
# 3. Verify credentials
echo $INVENTORY_DATABASE_URL # Check URL format
# 4. Test manual connection
psql $INVENTORY_DATABASE_URL -c "SELECT 1"
# 4. Enable connection debug logging — set environment variable:
# FRAISEQL_LOG=debug
#
# Or configure the database in fraiseql.toml:
[database]
url = "${INVENTORY_DATABASE_URL}"
pool_max = 10

“Pool exhausted - no available connections”

Section titled ““Pool exhausted - no available connections””

Symptoms: PoolExhaustedError: No available connections in pool for database 'inventory'

Causes:

  • Pool size too small for load
  • Connections leaking (not being released)
  • Long-running queries blocking other requests
  • Deadlock in pool acquisition

Solutions:

# 1. Increase pool size
[databases.inventory]
pool_size = 50 # Was 10, now 50
pool_timeout = 30000 # Timeout waiting for connection
[databases.inventory.idle]
max_age = 3600000 # Close idle connections after 1 hour
Terminal window
# 2. Monitor connection pool via the metrics endpoint:
# GET /metrics → fraiseql_database_pool_active, fraiseql_database_pool_idle
# 3. Identify slow queries — set slow query threshold in fraiseql.toml:
# [database]
# log_slow_queries_ms = 1000 # Log queries slower than 1 second
# Or use FRAISEQL_LOG=debug for full query logging.

Symptoms: FederationTimeoutError: Federated query to 'inventory' timed out after 5000ms

Causes:

  • Network latency between databases
  • Query on remote database is slow
  • Default timeout too aggressive

Solutions:

# 1. Increase federation timeout
[federation]
default_timeout = 10000 # Increased from 5000ms
batch_size = 50 # Smaller batches = faster queries
# 2. Per-database timeout
[federation.database_timeouts]
inventory = 10000
payments = 15000 # Slower database needs more time
# 2. Configure federation batch size and timeout in fraiseql.toml:
[federation]
batch_size = 100
default_timeout = 10000
# The Python type declares the shape only — no database routing in the decorator:
@fraiseql.type
class Order:
id: ID
items: list[OrderItem] # Federated from inventory DB — see fraiseql.toml
-- 3. Add database indexes on the inventory database:
CREATE INDEX idx_order_item_order_id ON tb_order_item(order_id);

Symptoms: CircularReferenceError when loading schema or executing deeply nested queries.

Causes:

  • Bidirectional federated references
  • Deeply nested federated queries

Solutions:

# BAD: Circular reference — avoid federating back to the originating type
@fraiseql.type
class Order:
items: list[OrderItem] # Federated from inventory DB
@fraiseql.type
class OrderItem:
order: Order # Circular! OrderItem federates back to Order
# GOOD: Only federate in one direction
@fraiseql.type
class Order:
items: list[OrderItem] # Federated from inventory DB (one way only)
@fraiseql.type
class OrderItem:
order_id: ID # Just store the ID — don't federate back to Order

“Inconsistent foreign keys across databases”

Section titled ““Inconsistent foreign keys across databases””

Symptoms: ForeignKeyError: Order references Product ID that doesn't exist

Causes:

  • Data deleted in one database but not cascaded
  • Race condition between databases
  • Data inconsistency

Solutions:

# 1. Add validation before federation
@fraiseql.mutation
async def add_item_to_order(order_id: ID, product_id: ID) -> OrderItem:
"""Validate product exists before adding."""
# Check product exists in inventory database
product = await fraiseql.query(
f"""
query {{
product(id: "{product_id}") {{
id
}}
}}
"""
)
if not product:
raise ValueError(f"Product {product_id} not found")
# Now safe to create item
return await create_order_item(order_id, product_id)
# 2. Implement foreign key constraints
-- On primary database
ALTER TABLE tb_order_item
ADD CONSTRAINT fk_product_exists
CHECK (
product_id IN (
SELECT id FROM v_product_from_inventory
)
);

Symptoms: SagaCompensationError: Compensation step 'reserve_inventory' failed

Causes:

  • Compensation function has bugs
  • Database state changed unexpectedly
  • Compensation takes too long (timeout)

Solutions:

# 1. Add detailed compensation logging
@compensate("reserve_inventory")
async def compensate_reserve_inventory(ctx):
"""Release reserved inventory with logging."""
try:
logging.info(f"Compensating reservations: {ctx.reservations}")
for reservation_id in ctx.reservations:
logging.debug(f"Releasing reservation {reservation_id}")
await execute_sql(
"UPDATE tb_reservation SET status = 'released' WHERE id = $1",
[reservation_id]
)
logging.debug(f"Released reservation {reservation_id}")
except Exception as e:
logging.error(f"Compensation failed: {e}", exc_info=True)
# Re-raise so saga knows it failed
raise SagaCompensationError(
step="reserve_inventory",
reason=str(e)
) from e
# 2. Make compensation idempotent
@compensate("process_payment")
async def compensate_payment(ctx):
"""Refund payment (idempotent)."""
transaction = await execute_sql(
"SELECT status FROM tb_transaction WHERE id = $1",
[ctx.transaction_id]
)
# Only refund if not already refunded
if transaction['status'] == 'refunded':
logging.info(f"Payment already refunded: {ctx.transaction_id}")
return
# Process refund
await refund_payment(ctx.transaction_id)
# Mark as refunded
await execute_sql(
"UPDATE tb_transaction SET status = 'refunded' WHERE id = $1",
[ctx.transaction_id]
)
# 3. Increase timeout for compensation
@saga(
steps=["create_order", "reserve_inventory"],
compensation_timeout=30000 # 30 seconds for compensation
)
async def create_order(...):
pass

Symptoms: Order created but saga never completes; stuck in pending status.

Causes:

  • One saga step is hanging
  • Network issue between databases
  • Database deadlock

Solutions:

Terminal window
# 1. Monitor saga/mutation progress via structured logs.
# Set FRAISEQL_LOG=debug to see each request with its requestId.
# Use pg_notify observers in fraiseql.toml to track step completions:
# [[observers]]
# channel = "saga_step_completed"
# nats_subject = "fraiseql.saga.step"
# 2. Implement saga timeout
@saga(
steps=["create_order", "reserve_inventory"],
timeout=60000 # Overall saga timeout: 60 seconds
)
async def create_order(...):
pass
# 3. Check stuck sagas in database
SELECT *
FROM tb_saga_execution
WHERE status = 'pending'
AND created_at < NOW() - INTERVAL '5 minutes'
ORDER BY created_at DESC;
# 4. Manual cleanup of stuck sagas
@fraiseql.mutation
async def cleanup_stuck_saga(saga_id: ID) -> bool:
"""Manually trigger compensation for stuck saga."""
saga = await execute_sql(
"SELECT * FROM tb_saga_execution WHERE id = $1",
[saga_id]
)
if saga['status'] != 'pending':
raise ValueError(f"Saga not pending: {saga['status']}")
# Trigger compensations in reverse order
for step in reversed(saga['completed_steps']):
await trigger_compensation(step, saga)
await execute_sql(
"UPDATE tb_saga_execution SET status = 'compensated' WHERE id = $1",
[saga_id]
)
return True

Symptoms: Query with federated field takes 10+ seconds

Causes:

  • Network latency
  • Missing indexes
  • Cartesian product (N+1 problem)
  • Query hitting large tables

Solutions:

# 1. Check if federation is batching correctly
@fraiseql.query
def orders_with_items(limit: int = 100) -> list[Order]:
"""
With batching: Should be 2 queries total
- 1 query: SELECT * FROM orders LIMIT 100
- 1 query: SELECT * FROM order_items WHERE order_id IN (...)
"""
pass
# Enable query logging to verify
await fraiseql.enable_query_logging(
database="primary",
database="inventory"
)
# 2. Denormalize to reduce federated queries
@fraiseql.type
class Order:
item_count: int # Denormalized count from SQL view
# Federated from inventory DB — configured in fraiseql.toml
items: list[OrderItem]
# 3. Use selective federation
@fraiseql.query
def order_summary(id: ID) -> dict:
"""
Return only needed fields to avoid full federation.
"""
return {
"id": order.id,
"total": order.total,
"item_count": order.item_count
# Don't federate full items if not needed
}
# 4. Add indexes
-- On inventory database
CREATE INDEX idx_order_items_order_id_product_id
ON tb_order_item(order_id, product_id);

Symptoms: NatsConnectionError: Failed to connect to NATS server

Causes:

  • NATS server not running
  • Wrong URL/port
  • Firewall blocking

Solutions:

Terminal window
# 1. Check NATS server status
nats server info
# 2. Test connection
nats ping
# 3. Check configuration
cat fraiseql.toml | grep -A 3 "\[nats\]"
# 4. Verify URL format
echo $NATS_URL # Should be: nats://host:4222
# 5. Start NATS if not running
docker run -it --rm -p 4222:4222 nats

Symptoms: AuthorizationError: NATS authentication failed

Causes:

  • Wrong token/credentials
  • Expired credentials
  • Insufficient permissions

Solutions:

# 1. Update credentials
[nats.auth]
type = "token"
token = "${NATS_TOKEN}" # Ensure env var is set
# 2. Verify token
echo $NATS_TOKEN
# 3. Use NKey authentication (more secure)
[nats.auth]
type = "nkey"
nkey = "${NATS_NKEY}"
Terminal window
# 4. Generate new credentials
nats user create fraiseql-user
nats nkey gen user -o fraiseql.nk # NKey

Symptoms: StreamNotFoundError: Stream 'orders' not found

Causes:

  • Stream not created
  • Stream name mismatch
  • Configuration issue

Solutions:

Terminal window
# 1. List existing streams
nats stream list
# 2. Check stream configuration
nats stream info orders
# 3. Create missing stream
nats stream add orders \
--subjects "fraiseql.order.>" \
--max-msgs 1000000 \
--max-bytes 10GB \
--retention limits
# 4. Ensure stream is configured
[nats.jetstream.streams.orders]
subjects = ["fraiseql.order.>"]
replicas = 3
max_msgs = 1000000
max_bytes = 10737418240

Symptoms: Consumer far behind in processing; queue backs up

Causes:

  • Consumer processing is slow
  • Consumer crashed/restarted
  • Not enough instances of consumer

Solutions:

Terminal window
# 1. Check consumer status
nats consumer info orders order-processor
# Output shows:
# Pending: 50000 # Many messages waiting
# Delivered: 1000
# Acked: 800
# 2. Increase processing capacity
# Scale up consumer service: 1 instance -> 3 instances
# 3. Check consumer queue group
nats consumer info orders order-processor
# 4. Increase ack wait if processing is slow
[nats.jetstream.consumers.order-processor]
ack_wait = "60s" # Increased from 30s
# 5. Optimize event handler
@subscribe("fraiseql.order.created", queue_group="order-processors")
async def process_order(event: dict):
"""Process order efficiently."""
order_id = event["data"]["order_id"]
# Batch process if possible
cache_key = f"processed:{order_id}"
if await redis.exists(cache_key):
return # Already processed
# Do work
await process_order_work(event)
# Mark as done
await redis.set(cache_key, "true", ex=3600)

Symptoms: Event published but subscribers don’t receive it

Causes:

  • Subscriber not running
  • Subject mismatch
  • Consumer has unprocessed limit

Solutions:

Terminal window
# 1. Check consumer status
nats consumer info orders order-processor
# Look for:
# - NumPending (messages waiting)
# - NumAckPending (unacked messages)
# 2. Check subject matches
# Published to: fraiseql.order.created
# Subscribed to: fraiseql.order.> (match)
# Subscribed to: orders.created (no match)
# 3. Verify subscriber is running
@subscribe("fraiseql.order.created")
async def handle_order_created(event: dict):
"""Handle order created."""
logging.info(f"Received order event: {event['order_id']}")
# If this log never appears, subscriber isn't running
# 4. Check max deliver limit
# If message is redelivered more than max_deliver times,
# it goes to dead letter queue
[nats.jetstream.consumers.order-processor]
max_deliver = 3 # Redelivered max 3 times
# Check dead letter queue
@subscribe("fraiseql.dlq")
async def handle_dead_letter(event: dict):
logging.error(f"Dead letter: {event}")

Symptoms: Status changed events arrive before creation event

Causes:

  • Multiple consumer instances processing same events
  • Network reordering
  • Consumer group distributing across instances

Solutions:

# 1. Process events sequentially per order
@subscribe("fraiseql.order.>", queue_group="order-seq")
async def process_order_event_sequentially(event: dict):
"""Process orders sequentially (one at a time)."""
order_id = event["data"]["order_id"]
# Use distributed lock per order
async with distributed_lock(f"order:{order_id}"):
# Only one process can handle events for this order at a time
await process_event(event)
# 2. OR partition by order ID to ensure ordering
[nats.partitions]
enabled = true
key = "order_id" # Same order always goes to same partition
count = 8
# 3. Use durable consumer with explicit ack
[nats.jetstream.consumers.order-processor]
deliver_policy = "all" # Start from beginning
ack_policy = "explicit" # Must explicitly ACK
ack_wait = "30s" # Timeout if not ACKed
max_deliver = 3 # Retry 3 times

Symptoms: Same event processed multiple times; duplicate orders created

Causes:

  • No idempotency checks
  • At-least-once delivery semantics
  • Retry without deduplication

Solutions:

# 1. Implement idempotent handler
@subscribe("fraiseql.order.created")
async def handle_order_created_idempotent(event: dict):
"""Process event idempotently."""
# Generate deterministic event ID
event_id = f"{event['order_id']}-{event['timestamp']}"
# Check if already processed
if await is_processed(event_id):
logging.debug(f"Event already processed: {event_id}")
return
# Process
try:
await create_order(event)
await mark_processed(event_id)
except Exception as e:
logging.error(f"Error processing event: {e}")
raise # Let NATS retry
async def is_processed(event_id: str) -> bool:
"""Check if event ID was already processed."""
return await redis.exists(f"event_processed:{event_id}")
async def mark_processed(event_id: str):
"""Mark event as processed (idempotency key)."""
await redis.setex(
f"event_processed:{event_id}",
30 * 24 * 3600, # 30-day TTL
"true"
)
# 2. Use event version/correlation ID
event = {
"id": "evt_550e8400", # Unique event ID
"correlation_id": "order_550e8400", # Links to entity
"version": 1, # Version for this event type
"type": "order.created",
"data": {...}
}

”Saga completes but event never publishes”

Section titled “”Saga completes but event never publishes””

Symptoms: Order created successfully but notification service doesn’t receive event

Causes:

  • Event publish happens after saga completes but before response
  • NATS publish fails silently
  • Network partition after federation but before NATS

Solutions:

# 1. Wait for event confirmation
@saga.step("publish_confirmation")
async def step_publish_confirmation(ctx):
"""Final step: publish confirmation with retry."""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
await publish(
subject="fraiseql.order.confirmed",
data={...},
timeout=5000
)
return
except Exception as e:
retry_count += 1
if retry_count >= max_retries:
raise
wait_time = 2 ** retry_count # Exponential backoff
await asyncio.sleep(wait_time)
# 2. Store events in database for guaranteed delivery
@saga.step("store_events")
async def step_store_events(ctx):
"""Store events in DB for async publishing."""
await execute_sql(
"""
INSERT INTO tb_pending_events (order_id, event_type, payload)
VALUES ($1, $2, $3)
""",
[ctx.order_id, "order.created", json.dumps({...})]
)
# Separate service publishes from DB to NATS
# Guaranteed to eventually publish
# 3. Add publishing monitoring
@fraiseql.after_mutation("create_order")
async def after_create_order_monitored(order):
"""Publish with monitoring."""
try:
await metrics.time("event.publish", async_fn=lambda: publish(...))
except Exception as e:
await metrics.increment("event.publish.error")
await alert_ops(f"Failed to publish event for order {order.id}")

“Race condition: Event arrives before federation completes”

Section titled ““Race condition: Event arrives before federation completes””

Symptoms: Notification service processes event but queries return empty

Causes:

  • Event subscriber queries federation before saga completes
  • Event published before database transaction commits
  • Clock skew or timing issue

Solutions:

# 1. Only publish after transaction commits
@saga.step("final_publish")
async def step_final_publish(ctx):
"""Publish only after all federation steps complete."""
# Ensure all federation updates are committed
# This is the final step - publish here
await publish(
subject="fraiseql.order.confirmed",
data={...}
)
# 2. Event handler waits for data availability
@subscribe("fraiseql.order.created")
async def handle_order_created_with_retry(event: dict):
"""Handle event with retry for data availability."""
order_id = event["data"]["order_id"]
# Retry if order not yet available
max_retries = 5
for attempt in range(max_retries):
order = await fraiseql.query(
f'query {{ order(id: "{order_id}") {{ id }} }}'
)
if order:
await process_order(order)
return
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Backoff
logging.error(f"Order not found after retries: {order_id}")
# 3. Include data in event (denormalization)
# Don't require subscribers to query federation
await publish(
subject="fraiseql.order.created",
data={
"order_id": order.id,
"customer_id": order.customer_id,
"total": str(order.total),
"items": [...] # Include items in event
# No need for subscriber to federate
}
)
-- Problem: Saga holds lock while federation waits
-- Solution: Use lower isolation level
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
  • Enable query logging for all databases
  • Monitor federation query latency (p50, p95, p99)
  • Track NATS message throughput and lag
  • Monitor saga completion rates and failures
  • Set up alerts for dead letter queues
  • Track event processing latency
  • Monitor connection pool exhaustion
  • Check for circular federation references
  • Verify event handler idempotency
  • Test failure scenarios regularly

Federation Reference

Complete reference documentation for FraiseQL’s federation capabilities. Federation Guide

NATS Reference

Reference documentation for NATS integration and JetStream configuration. NATS Guide

Error Handling

Patterns for handling errors in federated and event-driven applications. Error Handling Guide