Skip to content

Observer and Webhook Patterns

FraiseQL supports event-driven architectures through its observer system. When a mutation succeeds, the Rust runtime can publish events to a message bus (NATS, Redis, or PostgreSQL LISTEN/NOTIFY). From there, other services can subscribe to those events and deliver outbound webhooks. This guide explains the mental model and how to configure it correctly.

FraiseQL’s observer system is infrastructure-level, not Python-level:

Client GraphQL mutation
Rust runtime executes SQL (fn_create_order → mutation_response)
Rust runtime publishes event to message bus
Subscribers (other services, webhook delivery agents) react

There is no Python code involved at runtime. The Python SDK is compile-time only — it generates schema.json which is compiled to schema.compiled.json and handed to the Rust runtime. The Rust runtime manages all event delivery.

Observer configuration lives in fraiseql.toml. The [observers] section tells the Rust runtime which message bus to use for publishing mutation events.

[observers]
backend = "nats"
nats_url = "nats://localhost:4222"

When backend = "nats", the Rust runtime publishes a message to a NATS subject after each successful mutation. The subject follows the pattern fraiseql.{entity_type}.{operation} — for example, fraiseql.order.created (lowercase snake_case).

[observers]
backend = "redis"
redis_url = "redis://localhost:6379"

With Redis, events are published to Redis channels. Redis pub/sub is suitable for simpler deployments where NATS is not already in the stack.

[observers]
backend = "postgres"

With the postgres backend, the Rust runtime uses PostgreSQL’s NOTIFY on a channel (e.g., fraiseql_events) after each mutation. Subscribers use LISTEN on a persistent connection. This backend requires no additional infrastructure.

Regardless of backend, the event payload is a JSON object derived from the mutation_response returned by the PostgreSQL function:

{
"event": "fraiseql.order.created",
"entity_type": "order",
"operation": "created",
"entity_id": "550e8400-e29b-41d4-a716-446655440000",
"entity": { ... },
"timestamp": "2026-03-02T14:00:00Z"
}

The entity field contains the JSON representation of the created or updated record, sourced from the entity JSONB column of mutation_response.

Subscriber services are separate processes — they are not part of your FraiseQL schema definition. You write them in whatever language and framework you prefer.

import asyncio
import json
import httpx
import nats
WEBHOOK_URL = "https://notifications.example.com/orders"
WEBHOOK_SECRET = "..." # from environment
async def handle_order_created(msg):
event = json.loads(msg.data.decode())
order_id = event["entity_id"]
entity = event["entity"]
async with httpx.AsyncClient() as client:
await client.post(
WEBHOOK_URL,
json={
"event": "order.created",
"order_id": order_id,
"customer_email": entity.get("customer_email"),
"total": entity.get("total"),
},
headers={"Authorization": f"Bearer {WEBHOOK_SECRET}"},
timeout=10.0,
)
async def main():
nc = await nats.connect("nats://localhost:4222")
await nc.subscribe("fraiseql.order.created", cb=handle_order_created)
await asyncio.Event().wait() # run forever
asyncio.run(main())

This subscriber is a standalone process — it runs independently of the FraiseQL server and has no relation to your schema.json or Python schema definition files.

import asyncio
import json
import asyncpg
async def main():
conn = await asyncpg.connect("postgresql://localhost/mydb")
async def listener(conn, pid, channel, payload):
event = json.loads(payload)
print(f"Event received: {event['event']} for entity {event['entity_id']}")
# dispatch webhook, update cache, etc.
await conn.add_listener("fraiseql_events", listener)
await asyncio.Event().wait()
asyncio.run(main())

Delivering webhooks from event subscribers follows standard HTTP patterns. The FraiseQL runtime publishes events; your subscriber service is responsible for HTTP delivery, retries, and signature.

A webhook receiver should be able to verify the authenticity of the payload. Sign with HMAC-SHA256:

import hmac
import hashlib
import json
import time
def send_signed_webhook(url: str, payload: dict, secret: str) -> None:
body = json.dumps(payload).encode()
timestamp = str(int(time.time()))
message = f"{timestamp}.".encode() + body
signature = "sha256=" + hmac.new(
secret.encode(), message, hashlib.sha256
).hexdigest()
import httpx
httpx.post(
url,
content=body,
headers={
"Content-Type": "application/json",
"X-Signature": signature,
"X-Timestamp": timestamp,
},
timeout=15.0,
)

Receiver-side verification:

import hmac
import hashlib
import time
def verify_webhook(body: bytes, signature: str, timestamp: str, secret: str) -> bool:
# Reject stale payloads (> 5 minutes old)
if abs(time.time() - int(timestamp)) > 300:
return False
message = f"{timestamp}.".encode() + body
expected = "sha256=" + hmac.new(
secret.encode(), message, hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
import asyncio
import httpx
async def deliver_with_retry(
url: str,
payload: dict,
max_attempts: int = 3,
initial_delay: float = 0.1,
) -> None:
delay = initial_delay
for attempt in range(1, max_attempts + 1):
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, timeout=15.0)
if response.status_code < 500:
return # success or unretryable client error
except httpx.TransportError:
pass # network error — retry
if attempt < max_attempts:
await asyncio.sleep(delay)
delay *= 2.5 # exponential backoff
# all attempts exhausted — log to DLQ or alert

Your schema.py defines mutations and types. The observer backend configuration in fraiseql.toml is separate from schema authoring.

import fraiseql
from fraiseql.scalars import ID
from enum import Enum
@fraiseql.enum
class OrderStatus(Enum):
PENDING = "pending"
SHIPPED = "shipped"
DELIVERED = "delivered"
@fraiseql.type
class Order:
id: ID
customer_email: str
total: float
status: OrderStatus
@fraiseql.input
class CreateOrderInput:
customer_email: str
total: float
@fraiseql.mutation(sql_source="fn_create_order", operation="CREATE")
def create_order(input: CreateOrderInput) -> Order:
"""Create a new order. The Rust runtime publishes a NATS event after success."""
pass
fraiseql.export_schema("schema.json")

There are no observer annotations in the schema definition. The mutation executes SQL, the Rust runtime publishes the event to NATS (or Redis/Postgres), and subscribers handle delivery.

The mutation function follows the standard FraiseQL trinity pattern and returns mutation_response:

CREATE TABLE tb_order (
pk_order BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL,
identifier TEXT UNIQUE NOT NULL, -- e.g. order number
fk_user BIGINT NOT NULL REFERENCES tb_user(pk_user),
total NUMERIC(12, 2) NOT NULL,
status TEXT NOT NULL DEFAULT 'pending'
);
CREATE FUNCTION fn_create_order(
p_customer_email TEXT,
p_total NUMERIC
) RETURNS mutation_response
LANGUAGE plpgsql AS $$
DECLARE
v_order tb_order;
v_response mutation_response;
BEGIN
INSERT INTO tb_order (identifier, fk_user, total)
VALUES (
'ORD-' || to_char(NOW(), 'YYYYMMDDHH24MISS'),
(SELECT pk_user FROM tb_user WHERE identifier = p_customer_email),
p_total
)
RETURNING * INTO v_order;
v_response.status := 'success';
v_response.entity_id := v_order.id;
v_response.entity_type := 'Order';
v_response.entity := jsonb_build_object(
'id', v_order.id,
'customer_email', p_customer_email,
'total', v_order.total,
'status', v_order.status
);
RETURN v_response;
END;
$$;

After this function returns with status = 'success', the Rust runtime publishes the event payload (including entity) to the configured observer backend.

[project]
name = "my-fraiseql-app"
version = "1.0.0"
[fraiseql]
schema_file = "schema.json"
output_file = "schema.compiled.json"
[observers]
backend = "nats"
nats_url = "nats://localhost:4222"
[security.enterprise]
audit_logging_enabled = true
[security.rate_limiting]
enabled = true
auth_start_max_requests = 100
auth_start_window_secs = 60
LayerResponsibilityLanguage
Schema authoring@fraiseql.type, @fraiseql.mutation, @scalarPython
Compilationfraiseql compileschema.compiled.jsonCLI (Rust)
GraphQL runtimeQuery/mutation execution, event publishingRust
Message busEvent delivery (NATS / Redis / Postgres)Infrastructure
SubscribersEvent consumption, webhook deliveryAny language

FraiseQL handles the left side of this table. Your subscriber services own the right side.

Webhooks Feature

Webhooks — Outgoing webhooks and incoming webhook verification reference

Subscriptions Feature

Subscriptions — Real-time push to clients via WebSocket