Skip to content

Observer-Webhook Integration Patterns

Observers and webhooks are powerful independently, but when combined they create sophisticated event-driven architectures. This guide shows production patterns for integrating these features.

Observers react to database mutations:

  • Watch specific entity changes (INSERT, UPDATE, DELETE)
  • Define conditions (field-level, value-based)
  • Trigger actions synchronously or async

Webhooks send external notifications:

  • HTTP POST to external services
  • Signed payloads for security
  • Automatic retries with exponential backoff

Together they enable:

  • Event-driven workflows
  • Multi-service coordination
  • Audit trails and compliance
  • Real-time synchronization
from fraiseql import type, observer, webhook
@type
class Order:
id: ID
customer_email: str
total: float
status: str
@observer(
entity="Order",
event="INSERT",
actions=[
webhook(
url="https://api.example.com/notifications",
method="POST",
headers={"Authorization": "Bearer ${WEBHOOK_API_KEY}"},
body={
"event": "order.created",
"order_id": "{{ id }}",
"customer_email": "{{ customer_email }}",
"total": "{{ total }}",
"timestamp": "{{ _timestamp }}"
}
)
]
)
def on_order_created():
"""Notify external service when order is created."""
pass

Execution flow:

  1. User creates order via GraphQL mutation
  2. Order inserted into database
  3. Observer triggers
  4. Webhook payload built with order data
  5. HMAC signature added
  6. POST sent to external service
  7. Response logged

Only trigger webhooks for specific conditions:

@observer(
entity="Order",
event="UPDATE",
condition="status.changed() && status == 'shipped'",
actions=[
webhook(
url="https://shipping.example.com/tracking",
body={
"order_id": "{{ id }}",
"tracking_number": "{{ tracking_number }}",
"customer_email": "{{ customer_email }}"
}
)
]
)
def notify_shipping_update():
"""Only notify when status changes to 'shipped'."""
pass

Condition types:

  • field.changed() — Any change to field
  • field.old == value && field.new == other — Specific state transition
  • total > 1000 — Business logic
  • Complex: (status == 'shipped' && total > 500) || is_priority

Multiple webhooks triggered in sequence:

@observer(
entity="Payment",
event="INSERT",
condition="status == 'approved'",
actions=[
webhook(
url="https://inventory.example.com/reserve",
body={"order_id": "{{ order_id }}", "items": "{{ items }}"}
),
webhook(
url="https://shipping.example.com/create-label",
body={"order_id": "{{ order_id }}", "address": "{{ shipping_address }}"}
),
webhook(
url="https://analytics.example.com/track",
body={"event": "payment_approved", "amount": "{{ amount }}"}
)
]
)
def process_payment():
"""Execute multiple webhooks in order."""
pass

Execution:

Payment approved
↓ (webhook 1)
Inventory reserved
↓ (webhook 2)
Shipping label created
↓ (webhook 3)
Analytics event tracked

If webhook 2 fails, retries happen (webhook 3 may wait or execute in parallel).

One observer triggers different webhooks based on data:

@observer(
entity="User",
event="INSERT",
actions=[
webhook(
url="https://crm.example.com/contacts",
condition="country == 'US'",
body={"email": "{{ email }}", "country": "{{ country }}"}
),
webhook(
url="https://gdpr.example.com/register",
condition="country IN ('DE', 'FR', 'UK')",
body={"email": "{{ email }}", "country": "{{ country }}", "gdpr_required": True}
),
webhook(
url="https://analytics.example.com/signup",
body={"user_id": "{{ id }}", "signup_date": "{{ created_at }}"}
)
]
)
def fan_out_user_registration():
"""Route webhooks based on user attributes."""
pass

Batch multiple events before sending webhook:

@observer(
entity="Order",
event="INSERT",
condition="true",
actions=[
webhook(
url="https://analytics.example.com/batch",
batch={
"size": 10,
"timeout": "5s"
},
body={
"orders": [
{"id": "{{ id }}", "total": "{{ total }}"}
],
"batch_timestamp": "{{ _timestamp }}"
}
)
]
)
def batch_order_analytics():
"""Aggregate 10 orders or wait 5s before sending."""
pass

Benefits:

  • Reduces webhook calls by 90%
  • Improves analytics ingestion efficiency
  • Lowers HTTP overhead
[webhooks.retry]
max_attempts = 3
backoff = "exponential"
initial_delay = "100ms"
max_delay = "5s"

Timeline:

Attempt 1: 0ms (immediate)
Attempt 2: 100ms (100ms delay)
Attempt 3: 500ms (exponential: 100ms * 2.5)
If all fail → Dead Letter Queue
@observer(
entity="Payment",
event="INSERT",
retry={
"max_attempts": 5,
"backoff": "linear",
"initial_delay": "1s"
},
actions=[...]
)
def critical_payment_notification():
"""More aggressive retries for critical payments."""
pass
[webhooks.retry]
# Retry on transient errors
retry_on = [408, 429, 500, 502, 503, 504]
# Don't retry on client errors
dont_retry_on = [400, 401, 403, 404, 422]
StatusRetry?Reason
500YesServer error (transient)
400NoBad request (our payload is wrong)
429YesRate limited (temporary)
401NoUnauthorized (credentials wrong)
[webhooks.dlq]
enabled = true
backend = "postgresql"
table = "webhook_dlq"
retention = "7d"
max_size = "10GB"

PostgreSQL:

[webhooks.dlq]
backend = "postgresql"
table = "webhook_dlq"
schema = "public"

NATS JetStream:

[webhooks.dlq]
backend = "nats"
stream = "WEBHOOK_DLQ"
bucket = "webhook-failures"

File-based:

[webhooks.dlq]
backend = "file"
path = "/var/log/fraiseql/webhook_dlq.jsonl"
rotation = "daily"
compress = true

Query failed webhooks:

SELECT * FROM webhook_dlq
WHERE failed_at > NOW() - INTERVAL '24 hours'
ORDER BY failed_at DESC;

Replay a failed webhook:

from fraiseql.webhooks import replay_dlq
# Replay single webhook
replay_dlq(dlq_id="webhook_123")
# Replay all failures from last 24 hours
replay_dlq(
failed_after="2025-02-10",
filter={"observer": "on_payment_created"}
)
# Replay with custom delay
replay_dlq(
dlq_id="webhook_456",
delay="30s"
)

FraiseQL automatically signs all webhooks with HMAC-SHA256:

[webhooks.signing]
enabled = true
algorithm = "sha256"
secret = "${WEBHOOK_SECRET}"
header = "X-FraiseQL-Signature"
timestamp_header = "X-FraiseQL-Timestamp"

Signature format:

X-FraiseQL-Signature: sha256=abcd1234...
X-FraiseQL-Timestamp: 1707604800

Receiver-side verification (Python):

import hmac
import hashlib
import time
def verify_webhook(payload: bytes, signature: str, timestamp: str, secret: str) -> bool:
# Check timestamp freshness (prevent replay attacks)
ts = int(timestamp)
if abs(time.time() - ts) > 300: # 5 minute window
return False
# Compute expected signature
message = f"{timestamp}.".encode() + payload
expected = "sha256=" + hmac.new(
secret.encode(),
message,
hashlib.sha256
).hexdigest()
# Constant-time comparison
return hmac.compare_digest(signature, expected)
# Flask example
@app.route("/webhook", methods=["POST"])
def handle_webhook():
if not verify_webhook(
request.data,
request.headers.get("X-FraiseQL-Signature"),
request.headers.get("X-FraiseQL-Timestamp"),
WEBHOOK_SECRET
):
return "Invalid signature", 401
data = request.json
# Process webhook...
return "OK", 200

Encrypt sensitive data in webhook payloads:

@observer(
entity="User",
event="INSERT",
actions=[
webhook(
url="https://external.example.com/users",
encryption={
"enabled": True,
"algorithm": "AES-256-GCM",
"key": "${WEBHOOK_ENCRYPTION_KEY}"
},
body={
"id": "{{ id }}",
"email": "{{ email }}", # Will be encrypted
"ssn": "{{ ssn }}" # Will be encrypted
}
)
]
)
def send_user_data():
pass
[webhooks.dispatch]
# Don't block mutation on webhook completion
async = true
timeout = "30s"
# Queue failed webhooks in background
background_retry = true
max_queue_size = 1000

With async dispatch:

Mutation completes immediately
Webhook queued for delivery
(Happens in background)
Webhook sent with retries
[webhooks.batching]
enabled = true
batch_size = 50
timeout = "10s"

With 100 order inserts:

  • Batch 1: 50 webhooks (0-50ms)
  • Batch 2: 50 webhooks (500-510ms)
  • Total: 2 HTTP calls instead of 100
[webhooks.rate_limiting]
enabled = true
max_per_second = 100
burst = 200

Prevents overwhelming external service:

  • Sustain: 100 webhooks/second
  • Burst: Up to 200/second
  • Queues excess with backoff
Terminal window
# Start ngrok tunnel
ngrok http 8000
# Update webhook URL in config
[webhooks]
test_url = "https://abc123.ngrok.io/webhook"
# Start local server
python -m flask run --port 8000
from fraiseql.testing import MockWebhookServer
def test_order_webhook():
with MockWebhookServer(port=9999) as mock:
# Create order (triggers webhook)
result = client.mutate("""
mutation {
createOrder(input: {
customer_email: "user@example.com"
total: 99.99
}) {
id
}
}
""")
# Assert webhook was called
assert mock.called
assert mock.last_payload["customer_email"] == "user@example.com"
# Assert retry behavior
assert mock.call_count == 1 # or more if retried
[observability.prometheus]
enabled = true
port = 9090
namespace = "fraiseql"

Exposed metrics:

fraiseql_webhook_total{observer="on_order_created"}
fraiseql_webhook_duration_seconds{observer="on_order_created", status="success"}
fraiseql_webhook_failures_total{observer="on_order_created", reason="timeout"}
fraiseql_webhook_retries_total{observer="on_order_created"}
fraiseql_dlq_size{observer="on_order_created"}
# Prometheus alerts
- alert: WebhookFailureRate
expr: |
rate(fraiseql_webhook_failures_total[5m]) /
rate(fraiseql_webhook_total[5m]) > 0.05
annotations:
summary: "Webhook failure rate > 5%"
- alert: DLQBacklog
expr: fraiseql_dlq_size > 1000
annotations:
summary: "Webhook DLQ has {{ $value }} items"

Check:

  1. Observer condition matches data

    SELECT * FROM orders WHERE /* observer condition */;
  2. Webhook URL is accessible

    Terminal window
    curl -X POST https://api.example.com/notify
  3. Observer is registered in schema

Solutions:

  • Increase timeout:

    [webhooks]
    timeout = "60s"
  • Make receiver async:

    @app.route("/webhook", methods=["POST"])
    def webhook_handler():
    task_queue.enqueue(process_payload, request.json)
    return "Accepted", 202 # Return immediately

Debug:

SELECT observer, COUNT(*), MAX(failed_at)
FROM webhook_dlq
GROUP BY observer
ORDER BY COUNT(*) DESC;

Remediate:

  • Fix receiver service
  • Replay DLQ
  • Adjust retry policy