Mutations
Observers
Observers provide event-driven logic for your FraiseQL API. They react to database changes and trigger actions like webhooks, emails, or Slack notifications.
Why Observers?
Section titled “Why Observers?”Traditional approaches to post-mutation logic:
- Application code: Business logic scattered across services
- Database triggers: Limited to SQL, hard to debug
- Message queues: Infrastructure complexity
Observers centralize event-driven logic in your fraiseql.toml configuration:
# fraiseql.toml — configure observer backend[observers]backend = "redis" # "in-memory" | "postgres" | "redis" | "nats"redis_url = "${REDIS_URL}"# nats_url = "${NATS_URL}"Observer actions (webhooks, Slack) are wired up in your schema configuration. See the sections below for each action type.
Observer lifecycle
Section titled “Observer lifecycle”Observer Anatomy
Section titled “Observer Anatomy”An observer consists of:
- Entity: The table/type being watched
- Event: INSERT, UPDATE, or DELETE
- Condition: When to trigger (optional Elo expression)
- Actions: What to do when triggered (webhook, Slack)
Observers are configured in fraiseql.toml. Database change events are delivered via PostgreSQL LISTEN/NOTIFY — the FraiseQL server registers listeners at startup and routes each event to matching observers.
# Example observer configuration in fraiseql.toml[observers]backend = "redis"redis_url = "${REDIS_URL}"Each observer action (webhook, Slack) is registered in your schema configuration file alongside your type definitions. The condition uses the same Elo expression syntax as custom scalar validation (see Elo Validation Language).
Events
Section titled “Events”FraiseQL observers react to three event types, delivered via PostgreSQL LISTEN/NOTIFY:
INSERT
Section titled “INSERT”Triggered when a new record is created. The event payload contains the new record’s data.
Entity: UserEvent: INSERT→ Delivers new user data to configured webhook/Slack actionsUPDATE
Section titled “UPDATE”Triggered when a record is modified. The condition can use field.changed() syntax to filter on specific field transitions:
Condition: status.changed() && status == 'shipped'→ Fires only when the status field transitions to 'shipped'Change detection in conditions:
field.changed()— True if the field value changed in this eventfield.old— Previous value (available in condition context for UPDATE events)field.new— New value (same asfieldin condition context)
DELETE
Section titled “DELETE”Triggered when a record is removed. The event payload contains the deleted record’s data.
Conditions
Section titled “Conditions”Conditions filter which events trigger an observer. They use the Elo expression language with && (AND) and || (OR) logical operators:
Simple Comparisons
Section titled “Simple Comparisons”total > 1000status == 'active'is_premium == trueChange Detection
Section titled “Change Detection”# Field changed to specific valuestatus.changed() && status == 'shipped'
# Field changed from specific valuestatus.old == 'pending' && status == 'approved'
# Any change to fieldemail.changed()Complex Logic
Section titled “Complex Logic”# Multiple conditionstotal > 1000 && is_premium == true
# OR conditionsstatus == 'failed' || retry_count > 3
# Field comparisonsquantity > min_quantityActions
Section titled “Actions”Webhook
Section titled “Webhook”The webhook action sends an HTTP POST to a configured URL. It supports custom headers and Mustache-style body templates:
Template variables:
{{field_name}}— Mustache-style field substitution in body templates{{_json}}— Complete record as JSON
Supported configuration:
url— Hardcoded URLurl_env— URL from environment variable (preferred for secrets)headers— Custom HTTP headers (use env var references for tokens)body_template— Mustache template; if omitted, the raw event data is sent
Webhook payload format
Section titled “Webhook payload format”When no body_template is specified, FraiseQL sends a standard JSON payload to the webhook URL:
// Webhook payload received by your endpoint{ "event": "INSERT", "table": "tb_order", "timestamp": "2026-02-25T10:00:00Z", "data": { "new": { "id": "order-123", "status": "confirmed", "total": 99.99, "user_id": "user-456" }, "old": null }}For UPDATE events, "old" contains the previous field values and "new" contains the updated values. For DELETE events, "new" is null and "old" contains the deleted record. The "timestamp" field is a stable event ID you can use for deduplication.
The email action sends messages via SMTP using the lettre crate. It supports TLS, SMTP authentication, and configurable sender/recipient addresses.
Supported configuration:
from— Sender email addressto— Recipient email address(es)subject— Subject line with{field_name}substitutionbody_template— Body text with{field_name}substitutionsmtp_hostorsmtp_host_env— SMTP server hostnamesmtp_port— SMTP port (default: 587)smtp_user_env— SMTP username from environment variablesmtp_password_env— SMTP password from environment variabletls—"required"(default),"opportunistic", or"none"
The Slack action sends a message to a Slack webhook URL. Configure the webhook URL via an environment variable and specify the message template:
Supported configuration:
webhook_urlorwebhook_url_env— Slack incoming webhook URLchannel— Slack channel (#channel-name)message— Message text with{field_name}substitution
Messages support standard Slack formatting (:emoji:, *bold*, _italic_).
Retry Configuration
Section titled “Retry Configuration”The observer system uses at-least-once delivery. When a webhook action fails (non-2xx response or network error), it is retried with backoff. Retry behavior is configured per-observer in your schema configuration:
Retry options:
max_attempts: Maximum retry count (default: 3)backoff_strategy:"fixed","linear", or"exponential"initial_delay_ms: First retry delay in millisecondsmax_delay_ms: Maximum delay between retries
Because delivery is at-least-once, your webhook endpoints should be idempotent — the same event may arrive more than once after a crash or network partition. Use the event’s unique ID from the payload for deduplication.
Dead-Letter Queue (DLQ) cap
Section titled “Dead-Letter Queue (DLQ) cap”Events that exhaust all retry attempts are moved to the dead-letter queue for manual inspection. By default the DLQ is unbounded. In long-running deployments where failures are sustained (e.g. a downstream webhook is down for hours), the DLQ can grow large.
Set max_dlq_size in [observers] to cap it:
[observers]backend = "redis"redis_url = "${REDIS_URL}"max_dlq_size = 1000 # drop newest entry + emit metric when cap reachedWhen the cap is reached, the newest incoming DLQ entry is dropped (the oldest entries are preserved for inspection) and the fraiseql_observer_dlq_overflow_total counter increments. Alert on that counter to detect sustained downstream failures.
For complete configuration reference including sizing parameters, see TOML Configuration: [observers].
Complete Example
Section titled “Complete Example”Here is an e-commerce observer setup showing the TOML configuration and corresponding Python schema types:
fraiseql.toml — configure the observer backend:
[observers]backend = "redis"redis_url = "${REDIS_URL}"schema.py — define the GraphQL types (observer actions are wired separately in schema configuration):
import fraiseqlfrom fraiseql.scalars import ID, DateTime
@fraiseql.typeclass Order: id: ID customer_email: str status: str total: float created_at: DateTime
@fraiseql.typeclass Payment: id: ID order_id: ID amount: float status: str processed_at: DateTime | NoneThe observer actions (high-value order webhook, order-shipped Slack message, payment-failed webhook) are registered via your schema configuration file alongside your type definitions. Each observer maps an entity + event + condition to one or more webhook or Slack actions.
Delivery Guarantees and CheckpointStrategy
Section titled “Delivery Guarantees and CheckpointStrategy”The observer system uses at-least-once delivery by default — the same event may be dispatched more than once after a crash or restart. For most side effects (sending a notification, calling a webhook) this is acceptable because the action is idempotent or the duplicate is harmless.
| Strategy | Behaviour | Use when |
|---|---|---|
AtLeastOnce (default) | Event may be processed more than once | Action is idempotent (update a read model, send a metric, post to Slack) |
EffectivelyOnce | Deduplication via an idempotency table | Action must not execute twice (charge a card, send an email, credit an account) |
AtLeastOnce (default)
Section titled “AtLeastOnce (default)”No configuration needed. Observers that send Slack notifications, update read models, or fire webhooks that are idempotent by design use this strategy:
# AtLeastOnce — default, no extra configuration@fraiseql.subscription( entity_type="Order", topic="order.created", operation="create")def notify_fulfillment(order: Order) -> None: """Notifies the fulfillment system. Webhook is idempotent.""" passEffectivelyOnce
Section titled “EffectivelyOnce”For non-idempotent actions, configure an idempotency table. FraiseQL records a row before executing the action; duplicate events are detected and skipped via ON CONFLICT DO NOTHING:
# EffectivelyOnce — deduplication via idempotency table@fraiseql.subscription( entity_type="Order", topic="order.created", operation="create", checkpoint=fraiseql.EffectivelyOnce( idempotency_table="billing_observer_keys" ))def charge_customer(order: Order) -> None: """Charges the customer. Must not execute twice.""" passThe idempotency table is created automatically on first startup if it does not exist. Its schema:
CREATE TABLE billing_observer_keys ( listener_id TEXT NOT NULL, event_key TEXT NOT NULL, processed_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (listener_id, event_key));Old rows accumulate over time and must be pruned by a cleanup job. See the Observer Operations Runbook for cleanup strategies and retention policies.
Rust API
Section titled “Rust API”CheckpointStrategy is also available directly from the fraiseql_observers crate for embedding FraiseQL observers in a custom Rust application:
use fraiseql_observers::{CheckpointStrategy, ListenerConfig};
// Default: fast, no deduplicationlet strategy = CheckpointStrategy::AtLeastOnce;
// Effectively-once: records an idempotency key before processinglet strategy = CheckpointStrategy::EffectivelyOnce { idempotency_table: "fraiseql_observer_idempotency".to_string(),};| Method | Description |
|---|---|
is_duplicate(pool, listener_id, key) | Returns true if this event has already been processed |
record_idempotency_key(pool, listener_id, key) | Marks the event as processed (call before dispatching the action) |
Observer Log Output
Section titled “Observer Log Output”When RUST_LOG=debug is set, FraiseQL prints each observer invocation to stdout. Here is what firing the high-value order observer looks like:
[observer] on_high_value_order fired: Order{id=ord_123, total=1250.00}[observer] Sending Slack notification to #sales[observer] Slack notification delivered (channel=#sales, ts=1710509048.123456)[observer] on_high_value_order completed in 287msIf an action fails, the log includes the error and retry schedule:
[observer] on_high_value_order fired: Order{id=ord_124, total=2400.00}[observer] Sending Slack notification to #sales[observer] Slack notification failed: connection timeout (attempt 1/3)[observer] Retrying in 200ms (exponential backoff)[observer] Slack notification delivered on attempt 2How Observers Work
Section titled “How Observers Work”- Server startup: FraiseQL registers PostgreSQL
LISTENlisteners for database change events (not database triggers) - Write operation: Mutation modifies a
tb_table and emits apg_notify('fraiseql_changes', ...)notification - Event received: The FraiseQL server receives the notification via its LISTEN connection
- Condition evaluation: The observer’s Elo condition is evaluated against the event data
- Action dispatch: Matching observers queue their actions for execution
- Action execution: Actions execute asynchronously with retry logic
When multiple observers match the same event (same entity and event type), they execute in definition order — top-to-bottom as they appear in your schema file. This is deterministic and stable across deployments.
The pipeline from trigger to execution:
Observer execution pipeline
graph LR A[GraphQL Mutation] --> B[Write to tb_* Table] B --> C[pg_notify(fraiseql_changes)] C --> D{Condition Met?} D -->|Yes| E[Queue Actions] D -->|No| F[Skip] E --> G[Async Execution<br/>with Retry]Each observer can dispatch one or more action types:
Available action types
graph LR O[Observer] --> W[Webhook<br/>HTTP POST] O --> E[Email<br/>SMTP / SES] O --> S[Slack<br/>Channel / DM] O --> C[Custom<br/>Handler]Best Practices
Section titled “Best Practices”Keep Conditions Simple
Section titled “Keep Conditions Simple”Complex conditions are harder to debug. Prefer simple, readable conditions:
# Goodstatus == 'shipped'
# Avoid — hard to read and debugstatus == 'shipped' && total > 100 && customer_type == 'premium'For complex logic, create separate observers or use webhook endpoints that handle the routing logic.
Use Environment Variables
Section titled “Use Environment Variables”Never hardcode secrets or URLs in your configuration. Reference environment variables in webhook URL and header settings:
url_env = "WEBHOOK_URL" # URL from environment variableheaders.Authorization = "${API_TOKEN}" # Token from environment variableHandle Failures Gracefully
Section titled “Handle Failures Gracefully”Configure retry strategies based on whether your action is idempotent:
- Idempotent webhook (safe to retry):
max_attempts = 5,backoff_strategy = "exponential" - Non-idempotent (charge a card, send an SMS):
max_attempts = 1to avoid duplicates
Verify It Works
Section titled “Verify It Works”-
Configure the observer backend in
fraiseql.toml:[observers]backend = "in-memory" # Use in-memory for local testing -
Start a test webhook server (in another terminal):
Terminal window # Using Python for quick testingpython3 -m http.server 3001 & -
Create a user to trigger the observer:
Terminal window curl -X POST http://localhost:8080/graphql \-H "Content-Type: application/json" \-H "Authorization: Bearer $TOKEN" \-d '{"query": "mutation { createUser(input: { name: \"Test User\", email: \"test@example.com\" }) { id } }"}' -
Check webhook was received:
Terminal window # The webhook payload should appear in the http.server output:# {"event": "INSERT", "table": "tb_user", "timestamp": "2024-01-15T10:30:00Z", "data": {"new": {...}}} -
Check observer logs (if
RUST_LOG=debugis set):[observer] User INSERT event received: id=550e8400-e29b-41d4-a716-446655440000[observer] Sending webhook to http://localhost:3001/webhook[observer] Webhook delivered (status=200) -
Test with conditions: Configure an observer with
condition = "total > 100"and verify that low-value orders do not trigger it.
Troubleshooting
Section titled “Troubleshooting”Observer Not Firing
Section titled “Observer Not Firing”-
Check entity name matches the table name (case-sensitive):
entity="Order" # Matches tb_order -
Verify the mutation actually modifies the table:
- Observers only fire on INSERT/UPDATE/DELETE
- Read queries (SELECT) do not trigger observers
-
Check condition syntax:
# Validcondition="total > 100"# Invalid (no spaces around operator)condition="total>100" -
Enable debug logging:
Terminal window RUST_LOG=debug fraiseql run
Webhook Not Received
Section titled “Webhook Not Received”-
Test the webhook endpoint directly:
Terminal window curl -X POST http://your-webhook-url \-H "Content-Type: application/json" \-d '{"test": true}' -
Check network connectivity from FraiseQL server:
Terminal window # On the FraiseQL servercurl -v http://your-webhook-url -
Review retry logs:
[observer] Webhook failed: connection timeout (attempt 1/3)[observer] Retrying in 200ms
Performance Issues
Section titled “Performance Issues”If observers slow down mutations:
-
Use async execution (default) — observers fire after the mutation response is sent, not before
-
Reduce action count per observer — each action (webhook, Slack) adds latency
-
Move heavy processing to background jobs via NATS JetStream
Next Steps
Section titled “Next Steps”NATS Integration
Security