Skip to content

NATS Integration

FraiseQL integrates with NATS to publish database change events to a message bus, enabling event-driven architectures where downstream services react to data mutations in real time. This integration is configured through the observers system.

Before configuring NATS integration, you need:

  • A running NATS server (version 2.2 or later)
  • JetStream enabled on the server if you require durable, replayable streams
  • Network access from the machine running fraiseql to the NATS server
  1. Install and start the NATS server

    The quickest way to run NATS locally is with the official Docker image:

    Terminal window
    docker run -d --name nats \
    -p 4222:4222 \
    -p 8222:8222 \
    nats:latest -js

    The -js flag enables JetStream. Port 4222 is the client port; port 8222 exposes the HTTP monitoring endpoint.

  2. Configure observers to use NATS transport

    Add the NATS configuration to your fraiseql.toml under the [observers] section:

    In fraiseql.toml, set backend = "nats" and provide a connection URL:

    [observers]
    enabled = true
    backend = "nats"
    nats_url = "${NATS_URL}" # e.g. nats://localhost:4222

    Advanced JetStream settings (stream name, consumer name, deduplication window, retention) go in the separate observer runtime config (fraiseql-observer.toml):

    [transport]
    backend = "nats"
    [transport.nats]
    url = "${NATS_URL}"
    stream_name = "fraiseql_events"
    subject_prefix = "fraiseql.mutation"
    consumer_name = "fraiseql_observer_worker"
    [transport.nats.jetstream]
    dedup_window_minutes = 5
    max_age_days = 7
    max_msgs = 10_000_000
    max_bytes = 10_737_418_240 # 10 GB
    ack_wait_secs = 30
    max_deliver = 3
  3. Start FraiseQL

    Terminal window
    fraiseql run

    On startup, FraiseQL connects to NATS and logs a confirmation:

    [observers] transport: nats (nats://localhost:4222)
    [observers] JetStream enabled: true
    [observers] stream: fraiseql_events

The observers system supports three transport backends:

TransportUse CaseConfiguration
postgresDefault, single-instancePostgreSQL LISTEN/NOTIFY
natsMulti-instance, distributedNATS JetStream
in_memoryTesting onlyIn-memory event bus
[observers.nats]
url = "nats://localhost:4222" # Supports multiple: "nats://n1:4222,nats://n2:4222"
subject_prefix = "fraiseql.mutation"
consumer_name = "fraiseql_observer_worker"
stream_name = "fraiseql_events"
KeyTypeDefaultDescription
urlstringnats://localhost:4222NATS server URL(s)
subject_prefixstringfraiseql.mutationPrefix for published subjects
consumer_namestringfraiseql_observer_workerDurable consumer name
stream_namestringfraiseql_eventsJetStream stream name
[observers.nats.jetstream]
dedup_window_minutes = 5 # Message deduplication window
max_age_days = 7 # Maximum message retention
max_msgs = 10000000 # Maximum messages in stream
max_bytes = 10737418240 # Maximum stream size (10 GB)
ack_wait_secs = 30 # Acknowledgment timeout
max_deliver = 3 # Maximum delivery attempts

All NATS settings can be overridden via environment variables:

VariableOverrides
FRAISEQL_NATS_URLobservers.nats.url
FRAISEQL_NATS_SUBJECT_PREFIXobservers.nats.subject_prefix
FRAISEQL_NATS_CONSUMER_NAMEobservers.nats.consumer_name
FRAISEQL_NATS_STREAM_NAMEobservers.nats.stream_name
FRAISEQL_NATS_DEDUP_WINDOW_MINUTESobservers.nats.jetstream.dedup_window_minutes
FRAISEQL_NATS_MAX_AGE_DAYSobservers.nats.jetstream.max_age_days
FRAISEQL_NATS_MAX_MSGSobservers.nats.jetstream.max_msgs
FRAISEQL_NATS_MAX_BYTESobservers.nats.jetstream.max_bytes
FRAISEQL_NATS_ACK_WAIT_SECSobservers.nats.jetstream.ack_wait_secs
FRAISEQL_NATS_MAX_DELIVERobservers.nats.jetstream.max_deliver

When NATS is configured as the observer transport, FraiseQL automatically publishes structured JSON messages to NATS subjects whenever an observer-triggering event occurs.

The NATS subject follows the pattern:

{subject_prefix}.{entity_lowercase}.{event_lowercase}
# e.g., fraiseql.mutation.order.insert

Every NATS message follows this JSON structure:

{
"event": "INSERT",
"entity": "Order",
"subject": "fraiseql.mutation.order.insert",
"timestamp": "2024-01-15T10:30:00Z",
"data": {
"id": "ord_abc123",
"total": 149.99,
"status": "pending",
"customer_id": "cus_xyz789"
}
}
FieldDescription
eventThe database operation: INSERT, UPDATE, or DELETE
entityThe schema type that triggered the change
subjectThe full NATS subject the message was published to
timestampISO 8601 UTC timestamp of the database event
dataThe full row payload from the triggering operation

Downstream services subscribe to NATS subjects independently of FraiseQL using any NATS client library.

import asyncio
import json
import nats
async def main():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
async def order_handler(msg):
payload = json.loads(msg.data.decode())
order_id = payload["data"]["id"]
print(f"New order received: {order_id}")
await msg.ack()
# Durable consumer — survives service restarts
await js.subscribe(
"fraiseql.mutation.order.insert",
durable="order-processor",
cb=order_handler,
)
await asyncio.Event().wait()
asyncio.run(main())

Core NATS (without JetStream) provides at-most-once delivery. JetStream adds persistence, acknowledgement, and replay. Enable it for any production workload where losing events is unacceptable.

The JetStream stream is automatically created on startup with these defaults:

  • Stream name: fraiseql_events (configurable via stream_name)
  • Subjects: Matches the configured subject_prefix with wildcard
  • Retention: Limits-based (discards oldest when max_msgs or max_bytes exceeded)
  • Acknowledgment: Required (messages redelivered if not acknowledged)

Messages that exceed the maximum delivery attempt count are dropped. Monitor consumer lag via NATS monitoring:

Terminal window
nats consumer info fraiseql_events order-processor

JetStream handles retries automatically based on ack_wait_secs and max_deliver:

AttemptTiming
1Immediate delivery
2After ack_wait_secs if not acknowledged
3After another ack_wait_secs
FailureMessage dropped after max_deliver attempts

If your NATS server enforces authorization, grant FraiseQL publish and subscribe access to its subjects. The following snippet is an excerpt from a NATS server configuration file:

accounts {
fraiseql_account {
users = [
{
user: fraiseql
permissions: {
publish: ["fraiseql.mutation.>"]
subscribe: ["fraiseql.mutation.>", "_INBOX.>"]
}
}
]
}
}

_INBOX.> is required for request-reply patterns used internally by the NATS client.

For capturing existing PostgreSQL changes, you can run a bridge that forwards PostgreSQL LISTEN/NOTIFY events to NATS:

[observers]
backend = "nats"
run_bridge = true
[observers.bridge]
transport_name = "pg_to_nats"
batch_size = 100
poll_interval_secs = 1
notify_channel = "fraiseql_events"

The bridge polls the PostgreSQL change log and publishes to NATS, enabling gradual migration from LISTEN/NOTIFY to NATS without application changes.

FraiseQL exposes the following Prometheus metrics for the NATS integration:

MetricDescription
fraiseql_nats_messages_published_totalTotal messages published to NATS
fraiseql_nats_messages_received_totalTotal messages received from NATS
fraiseql_nats_subscriptions_activeCurrently active subscriptions
fraiseql_nats_jetstream_pendingMessages pending acknowledgement
fraiseql_nats_publish_latency_secondsHistogram of publish latency

Check NATS connection status via the health endpoint:

Terminal window
curl http://localhost:8080/health

Expected response:

{
"status": "ok",
"databases": { "primary": { "status": "ok" } },
"observers": {
"transport": "nats",
"connected": true,
"stream": "fraiseql_events"
}
}

Keep payloads small. Include only the fields consumers need. Large payloads increase storage and network cost:

# Preferred: reference data, let consumers fetch details if needed
{"order_id": "ord_123", "customer_id": "cus_456"}
# Avoid: embedding full nested objects
{"order": full_order_with_all_fields, "customer": full_customer_object}

Design handlers to be idempotent. JetStream guarantees at-least-once delivery. A message may be redelivered after a crash or network partition. Guard against duplicate processing:

@subscribe("fraiseql.mutation.order.insert")
async def process_order(event):
order_id = event.data["id"]
if await is_already_processed(order_id):
return
await do_process_order(order_id)
await mark_processed(order_id)

Use durable consumers in production. Ephemeral consumers lose their position when the subscriber disconnects. Durable consumers resume from where they left off.

Monitor consumer lag. If your consumer falls behind, increase the number of consumer instances or optimize processing time:

Terminal window
nats consumer info fraiseql_events order-processor

“NATS transport requires nats.url to be set”

Add the [observers.nats] section with a valid URL:

[observers.nats]
url = "nats://localhost:4222"

“run_bridge=true requires transport=nats”

The PostgreSQL to NATS bridge only works with NATS transport. Ensure:

[observers]
backend = "nats"
run_bridge = true

Messages not being delivered

  1. Verify NATS is running: nats server check
  2. Check JetStream is enabled: nats stream info fraiseql_events
  3. Confirm the consumer exists: nats consumer info fraiseql_events <consumer_name>
  4. Check FraiseQL logs for connection errors

Observers

Observers — Define the database change listeners that trigger NATS events

Subscriptions

Subscriptions — Push real-time updates to browser clients over WebSocket

Deployment

Deployment — Configure secrets and environment variables for production