Advanced NATS
NATS Messaging Patterns — Multi-service messaging
Observers and webhooks are powerful independently, but when combined they create sophisticated event-driven architectures. This guide shows production patterns for integrating these features.
Observers react to database mutations:
Webhooks send external notifications:
Together they enable:
from fraiseql import type, observer, webhook
@typeclass Order: id: ID customer_email: str total: float status: str
@observer( entity="Order", event="INSERT", actions=[ webhook( url="https://api.example.com/notifications", method="POST", headers={"Authorization": "Bearer ${WEBHOOK_API_KEY}"}, body={ "event": "order.created", "order_id": "{{ id }}", "customer_email": "{{ customer_email }}", "total": "{{ total }}", "timestamp": "{{ _timestamp }}" } ) ])def on_order_created(): """Notify external service when order is created.""" passExecution flow:
Only trigger webhooks for specific conditions:
@observer( entity="Order", event="UPDATE", condition="status.changed() && status == 'shipped'", actions=[ webhook( url="https://shipping.example.com/tracking", body={ "order_id": "{{ id }}", "tracking_number": "{{ tracking_number }}", "customer_email": "{{ customer_email }}" } ) ])def notify_shipping_update(): """Only notify when status changes to 'shipped'.""" passCondition types:
field.changed() — Any change to fieldfield.old == value && field.new == other — Specific state transitiontotal > 1000 — Business logic(status == 'shipped' && total > 500) || is_priorityMultiple webhooks triggered in sequence:
@observer( entity="Payment", event="INSERT", condition="status == 'approved'", actions=[ webhook( url="https://inventory.example.com/reserve", body={"order_id": "{{ order_id }}", "items": "{{ items }}"} ), webhook( url="https://shipping.example.com/create-label", body={"order_id": "{{ order_id }}", "address": "{{ shipping_address }}"} ), webhook( url="https://analytics.example.com/track", body={"event": "payment_approved", "amount": "{{ amount }}"} ) ])def process_payment(): """Execute multiple webhooks in order.""" passExecution:
Payment approved ↓ (webhook 1)Inventory reserved ↓ (webhook 2)Shipping label created ↓ (webhook 3)Analytics event trackedIf webhook 2 fails, retries happen (webhook 3 may wait or execute in parallel).
One observer triggers different webhooks based on data:
@observer( entity="User", event="INSERT", actions=[ webhook( url="https://crm.example.com/contacts", condition="country == 'US'", body={"email": "{{ email }}", "country": "{{ country }}"} ), webhook( url="https://gdpr.example.com/register", condition="country IN ('DE', 'FR', 'UK')", body={"email": "{{ email }}", "country": "{{ country }}", "gdpr_required": True} ), webhook( url="https://analytics.example.com/signup", body={"user_id": "{{ id }}", "signup_date": "{{ created_at }}"} ) ])def fan_out_user_registration(): """Route webhooks based on user attributes.""" passBatch multiple events before sending webhook:
@observer( entity="Order", event="INSERT", condition="true", actions=[ webhook( url="https://analytics.example.com/batch", batch={ "size": 10, "timeout": "5s" }, body={ "orders": [ {"id": "{{ id }}", "total": "{{ total }}"} ], "batch_timestamp": "{{ _timestamp }}" } ) ])def batch_order_analytics(): """Aggregate 10 orders or wait 5s before sending.""" passBenefits:
[webhooks.retry]max_attempts = 3backoff = "exponential"initial_delay = "100ms"max_delay = "5s"Timeline:
Attempt 1: 0ms (immediate)Attempt 2: 100ms (100ms delay)Attempt 3: 500ms (exponential: 100ms * 2.5)If all fail → Dead Letter Queue@observer( entity="Payment", event="INSERT", retry={ "max_attempts": 5, "backoff": "linear", "initial_delay": "1s" }, actions=[...])def critical_payment_notification(): """More aggressive retries for critical payments.""" pass[webhooks.retry]# Retry on transient errorsretry_on = [408, 429, 500, 502, 503, 504]
# Don't retry on client errorsdont_retry_on = [400, 401, 403, 404, 422]| Status | Retry? | Reason |
|---|---|---|
| 500 | Yes | Server error (transient) |
| 400 | No | Bad request (our payload is wrong) |
| 429 | Yes | Rate limited (temporary) |
| 401 | No | Unauthorized (credentials wrong) |
[webhooks.dlq]enabled = truebackend = "postgresql"table = "webhook_dlq"retention = "7d"max_size = "10GB"PostgreSQL:
[webhooks.dlq]backend = "postgresql"table = "webhook_dlq"schema = "public"NATS JetStream:
[webhooks.dlq]backend = "nats"stream = "WEBHOOK_DLQ"bucket = "webhook-failures"File-based:
[webhooks.dlq]backend = "file"path = "/var/log/fraiseql/webhook_dlq.jsonl"rotation = "daily"compress = trueQuery failed webhooks:
SELECT * FROM webhook_dlqWHERE failed_at > NOW() - INTERVAL '24 hours'ORDER BY failed_at DESC;Replay a failed webhook:
from fraiseql.webhooks import replay_dlq
# Replay single webhookreplay_dlq(dlq_id="webhook_123")
# Replay all failures from last 24 hoursreplay_dlq( failed_after="2025-02-10", filter={"observer": "on_payment_created"})
# Replay with custom delayreplay_dlq( dlq_id="webhook_456", delay="30s")FraiseQL automatically signs all webhooks with HMAC-SHA256:
[webhooks.signing]enabled = truealgorithm = "sha256"secret = "${WEBHOOK_SECRET}"header = "X-FraiseQL-Signature"timestamp_header = "X-FraiseQL-Timestamp"Signature format:
X-FraiseQL-Signature: sha256=abcd1234...X-FraiseQL-Timestamp: 1707604800Receiver-side verification (Python):
import hmacimport hashlibimport time
def verify_webhook(payload: bytes, signature: str, timestamp: str, secret: str) -> bool: # Check timestamp freshness (prevent replay attacks) ts = int(timestamp) if abs(time.time() - ts) > 300: # 5 minute window return False
# Compute expected signature message = f"{timestamp}.".encode() + payload expected = "sha256=" + hmac.new( secret.encode(), message, hashlib.sha256 ).hexdigest()
# Constant-time comparison return hmac.compare_digest(signature, expected)
# Flask example@app.route("/webhook", methods=["POST"])def handle_webhook(): if not verify_webhook( request.data, request.headers.get("X-FraiseQL-Signature"), request.headers.get("X-FraiseQL-Timestamp"), WEBHOOK_SECRET ): return "Invalid signature", 401
data = request.json # Process webhook... return "OK", 200Encrypt sensitive data in webhook payloads:
@observer( entity="User", event="INSERT", actions=[ webhook( url="https://external.example.com/users", encryption={ "enabled": True, "algorithm": "AES-256-GCM", "key": "${WEBHOOK_ENCRYPTION_KEY}" }, body={ "id": "{{ id }}", "email": "{{ email }}", # Will be encrypted "ssn": "{{ ssn }}" # Will be encrypted } ) ])def send_user_data(): pass[webhooks.dispatch]# Don't block mutation on webhook completionasync = truetimeout = "30s"
# Queue failed webhooks in backgroundbackground_retry = truemax_queue_size = 1000With async dispatch:
Mutation completes immediately ↓Webhook queued for delivery ↓(Happens in background)Webhook sent with retries[webhooks.batching]enabled = truebatch_size = 50timeout = "10s"With 100 order inserts:
[webhooks.rate_limiting]enabled = truemax_per_second = 100burst = 200Prevents overwhelming external service:
# Start ngrok tunnelngrok http 8000
# Update webhook URL in config[webhooks]test_url = "https://abc123.ngrok.io/webhook"
# Start local serverpython -m flask run --port 8000from fraiseql.testing import MockWebhookServer
def test_order_webhook(): with MockWebhookServer(port=9999) as mock: # Create order (triggers webhook) result = client.mutate(""" mutation { createOrder(input: { customer_email: "user@example.com" total: 99.99 }) { id } } """)
# Assert webhook was called assert mock.called assert mock.last_payload["customer_email"] == "user@example.com"
# Assert retry behavior assert mock.call_count == 1 # or more if retried[observability.prometheus]enabled = trueport = 9090namespace = "fraiseql"Exposed metrics:
fraiseql_webhook_total{observer="on_order_created"}fraiseql_webhook_duration_seconds{observer="on_order_created", status="success"}fraiseql_webhook_failures_total{observer="on_order_created", reason="timeout"}fraiseql_webhook_retries_total{observer="on_order_created"}fraiseql_dlq_size{observer="on_order_created"}# Prometheus alerts- alert: WebhookFailureRate expr: | rate(fraiseql_webhook_failures_total[5m]) / rate(fraiseql_webhook_total[5m]) > 0.05 annotations: summary: "Webhook failure rate > 5%"
- alert: DLQBacklog expr: fraiseql_dlq_size > 1000 annotations: summary: "Webhook DLQ has {{ $value }} items"Check:
Observer condition matches data
SELECT * FROM orders WHERE /* observer condition */;Webhook URL is accessible
curl -X POST https://api.example.com/notifyObserver is registered in schema
Solutions:
Increase timeout:
[webhooks]timeout = "60s"Make receiver async:
@app.route("/webhook", methods=["POST"])def webhook_handler(): task_queue.enqueue(process_payload, request.json) return "Accepted", 202 # Return immediatelyDebug:
SELECT observer, COUNT(*), MAX(failed_at)FROM webhook_dlqGROUP BY observerORDER BY COUNT(*) DESC;Remediate:
Advanced NATS
NATS Messaging Patterns — Multi-service messaging
Federation + NATS
Hybrid Integration — Combine sync and async