Observers
Observers — Define the database change listeners that trigger NATS events
FraiseQL integrates with NATS to publish database change events to a message bus, enabling event-driven architectures where downstream services react to data mutations in real time. This integration is configured through the observers system.
Before configuring NATS integration, you need:
fraiseql to the NATS serverInstall and start the NATS server
The quickest way to run NATS locally is with the official Docker image:
docker run -d --name nats \ -p 4222:4222 \ -p 8222:8222 \ nats:latest -jsThe -js flag enables JetStream. Port 4222 is the client port; port 8222 exposes the HTTP monitoring endpoint.
Configure observers to use NATS transport
Add the NATS configuration to your fraiseql.toml under the [observers] section:
In fraiseql.toml, set backend = "nats" and provide a connection URL:
[observers]enabled = truebackend = "nats"nats_url = "${NATS_URL}" # e.g. nats://localhost:4222Advanced JetStream settings (stream name, consumer name, deduplication window, retention) go in the separate observer runtime config (fraiseql-observer.toml):
[transport]backend = "nats"
[transport.nats]url = "${NATS_URL}"stream_name = "fraiseql_events"subject_prefix = "fraiseql.mutation"consumer_name = "fraiseql_observer_worker"
[transport.nats.jetstream]dedup_window_minutes = 5max_age_days = 7max_msgs = 10_000_000max_bytes = 10_737_418_240 # 10 GBack_wait_secs = 30max_deliver = 3Start FraiseQL
fraiseql runOn startup, FraiseQL connects to NATS and logs a confirmation:
[observers] transport: nats (nats://localhost:4222)[observers] JetStream enabled: true[observers] stream: fraiseql_eventsThe observers system supports three transport backends:
| Transport | Use Case | Configuration |
|---|---|---|
postgres | Default, single-instance | PostgreSQL LISTEN/NOTIFY |
nats | Multi-instance, distributed | NATS JetStream |
in_memory | Testing only | In-memory event bus |
[observers.nats]url = "nats://localhost:4222" # Supports multiple: "nats://n1:4222,nats://n2:4222"subject_prefix = "fraiseql.mutation"consumer_name = "fraiseql_observer_worker"stream_name = "fraiseql_events"| Key | Type | Default | Description |
|---|---|---|---|
url | string | nats://localhost:4222 | NATS server URL(s) |
subject_prefix | string | fraiseql.mutation | Prefix for published subjects |
consumer_name | string | fraiseql_observer_worker | Durable consumer name |
stream_name | string | fraiseql_events | JetStream stream name |
[observers.nats.jetstream]dedup_window_minutes = 5 # Message deduplication windowmax_age_days = 7 # Maximum message retentionmax_msgs = 10000000 # Maximum messages in streammax_bytes = 10737418240 # Maximum stream size (10 GB)ack_wait_secs = 30 # Acknowledgment timeoutmax_deliver = 3 # Maximum delivery attemptsAll NATS settings can be overridden via environment variables:
| Variable | Overrides |
|---|---|
FRAISEQL_NATS_URL | observers.nats.url |
FRAISEQL_NATS_SUBJECT_PREFIX | observers.nats.subject_prefix |
FRAISEQL_NATS_CONSUMER_NAME | observers.nats.consumer_name |
FRAISEQL_NATS_STREAM_NAME | observers.nats.stream_name |
FRAISEQL_NATS_DEDUP_WINDOW_MINUTES | observers.nats.jetstream.dedup_window_minutes |
FRAISEQL_NATS_MAX_AGE_DAYS | observers.nats.jetstream.max_age_days |
FRAISEQL_NATS_MAX_MSGS | observers.nats.jetstream.max_msgs |
FRAISEQL_NATS_MAX_BYTES | observers.nats.jetstream.max_bytes |
FRAISEQL_NATS_ACK_WAIT_SECS | observers.nats.jetstream.ack_wait_secs |
FRAISEQL_NATS_MAX_DELIVER | observers.nats.jetstream.max_deliver |
When NATS is configured as the observer transport, FraiseQL automatically publishes structured JSON messages to NATS subjects whenever an observer-triggering event occurs.
The NATS subject follows the pattern:
{subject_prefix}.{entity_lowercase}.{event_lowercase}# e.g., fraiseql.mutation.order.insertEvery NATS message follows this JSON structure:
{ "event": "INSERT", "entity": "Order", "subject": "fraiseql.mutation.order.insert", "timestamp": "2024-01-15T10:30:00Z", "data": { "id": "ord_abc123", "total": 149.99, "status": "pending", "customer_id": "cus_xyz789" }}| Field | Description |
|---|---|
event | The database operation: INSERT, UPDATE, or DELETE |
entity | The schema type that triggered the change |
subject | The full NATS subject the message was published to |
timestamp | ISO 8601 UTC timestamp of the database event |
data | The full row payload from the triggering operation |
Downstream services subscribe to NATS subjects independently of FraiseQL using any NATS client library.
import asyncioimport jsonimport nats
async def main(): nc = await nats.connect("nats://localhost:4222") js = nc.jetstream()
async def order_handler(msg): payload = json.loads(msg.data.decode()) order_id = payload["data"]["id"] print(f"New order received: {order_id}") await msg.ack()
# Durable consumer — survives service restarts await js.subscribe( "fraiseql.mutation.order.insert", durable="order-processor", cb=order_handler, )
await asyncio.Event().wait()
asyncio.run(main())import { connect, JSONCodec } from 'nats';
const nc = await connect({ servers: 'nats://localhost:4222' });const js = nc.jetstream();const jc = JSONCodec();
const consumer = await js.consumers.get('fraiseql_events', 'order-processor');const messages = await consumer.consume();
for await (const msg of messages) { const payload = jc.decode(msg.data) as { event: string; entity: string; subject: string; timestamp: string; data: Record<string, unknown>; };
const orderId = payload.data['id']; console.log(`New order received: ${orderId}`); msg.ack();}package main
import ( "encoding/json" "fmt" "log"
"github.com/nats-io/nats.go")
type Event struct { Event string `json:"event"` Entity string `json:"entity"` Subject string `json:"subject"` Timestamp string `json:"timestamp"` Data map[string]interface{} `json:"data"`}
func main() { nc, err := nats.Connect("nats://localhost:4222") if err != nil { log.Fatal(err) } defer nc.Close()
js, err := nc.JetStream() if err != nil { log.Fatal(err) }
sub, err := js.Subscribe("fraiseql.mutation.order.insert", func(msg *nats.Msg) { var event Event if err := json.Unmarshal(msg.Data, &event); err != nil { log.Printf("Error parsing message: %v", err) return } fmt.Printf("New order received: %s\n", event.Data["id"]) msg.Ack() }, nats.Durable("order-processor"))
if err != nil { log.Fatal(err) } defer sub.Unsubscribe()
select {}}Core NATS (without JetStream) provides at-most-once delivery. JetStream adds persistence, acknowledgement, and replay. Enable it for any production workload where losing events is unacceptable.
The JetStream stream is automatically created on startup with these defaults:
fraiseql_events (configurable via stream_name)subject_prefix with wildcardmax_msgs or max_bytes exceeded)Messages that exceed the maximum delivery attempt count are dropped. Monitor consumer lag via NATS monitoring:
nats consumer info fraiseql_events order-processorJetStream handles retries automatically based on ack_wait_secs and max_deliver:
| Attempt | Timing |
|---|---|
| 1 | Immediate delivery |
| 2 | After ack_wait_secs if not acknowledged |
| 3 | After another ack_wait_secs |
| Failure | Message dropped after max_deliver attempts |
If your NATS server enforces authorization, grant FraiseQL publish and subscribe access to its subjects. The following snippet is an excerpt from a NATS server configuration file:
accounts { fraiseql_account { users = [ { user: fraiseql permissions: { publish: ["fraiseql.mutation.>"] subscribe: ["fraiseql.mutation.>", "_INBOX.>"] } } ] }}_INBOX.> is required for request-reply patterns used internally by the NATS client.
For capturing existing PostgreSQL changes, you can run a bridge that forwards PostgreSQL LISTEN/NOTIFY events to NATS:
[observers]backend = "nats"run_bridge = true
[observers.bridge]transport_name = "pg_to_nats"batch_size = 100poll_interval_secs = 1notify_channel = "fraiseql_events"The bridge polls the PostgreSQL change log and publishes to NATS, enabling gradual migration from LISTEN/NOTIFY to NATS without application changes.
FraiseQL exposes the following Prometheus metrics for the NATS integration:
| Metric | Description |
|---|---|
fraiseql_nats_messages_published_total | Total messages published to NATS |
fraiseql_nats_messages_received_total | Total messages received from NATS |
fraiseql_nats_subscriptions_active | Currently active subscriptions |
fraiseql_nats_jetstream_pending | Messages pending acknowledgement |
fraiseql_nats_publish_latency_seconds | Histogram of publish latency |
Check NATS connection status via the health endpoint:
curl http://localhost:8080/healthExpected response:
{ "status": "ok", "databases": { "primary": { "status": "ok" } }, "observers": { "transport": "nats", "connected": true, "stream": "fraiseql_events" }}Keep payloads small. Include only the fields consumers need. Large payloads increase storage and network cost:
# Preferred: reference data, let consumers fetch details if needed{"order_id": "ord_123", "customer_id": "cus_456"}
# Avoid: embedding full nested objects{"order": full_order_with_all_fields, "customer": full_customer_object}Design handlers to be idempotent. JetStream guarantees at-least-once delivery. A message may be redelivered after a crash or network partition. Guard against duplicate processing:
@subscribe("fraiseql.mutation.order.insert")async def process_order(event): order_id = event.data["id"] if await is_already_processed(order_id): return await do_process_order(order_id) await mark_processed(order_id)Use durable consumers in production. Ephemeral consumers lose their position when the subscriber disconnects. Durable consumers resume from where they left off.
Monitor consumer lag. If your consumer falls behind, increase the number of consumer instances or optimize processing time:
nats consumer info fraiseql_events order-processor“NATS transport requires nats.url to be set”
Add the [observers.nats] section with a valid URL:
[observers.nats]url = "nats://localhost:4222"“run_bridge=true requires transport=nats”
The PostgreSQL to NATS bridge only works with NATS transport. Ensure:
[observers]backend = "nats"run_bridge = trueMessages not being delivered
nats server checknats stream info fraiseql_eventsnats consumer info fraiseql_events <consumer_name>Observers
Observers — Define the database change listeners that trigger NATS events
Subscriptions
Subscriptions — Push real-time updates to browser clients over WebSocket
Deployment
Deployment — Configure secrets and environment variables for production