Skip to content

Multi-Tenant SaaS with Federation & NATS

A complete multi-tenant SaaS application combining per-tenant database isolation, Apollo Federation v2, NATS messaging for cross-service events, and Row Level Security for data isolation.

Multi-tenant SaaS: tenants through Apollo Gateway to federated services with NATS events Multi-tenant SaaS: tenants through Apollo Gateway to federated services with NATS events
Each FraiseQL service is a separate process with its own database. NATS events flow to an analytics worker.
auth-service/fraiseql.toml
[project]
name = "auth-service"
version = "1.0.0"
[database]
url = "${AUTH_DATABASE_URL}"
[fraiseql]
schema_file = "schema.json"
output_file = "schema.compiled.json"
# OIDC config via environment variables — not in [security.pkce] TOML:
# OIDC_ISSUER_URL, OIDC_CLIENT_ID, OIDC_CLIENT_SECRET, OIDC_REDIRECT_URI
[security.pkce]
enabled = true
[federation]
enabled = true
service_name = "auth"
subgraph_url = "http://auth-service:8081/graphql"
[observers]
backend = "nats"
nats_url = "${NATS_URL}"
[[observers]]
table = "tb_organization"
event = "INSERT"
subject = "analytics.organization_created"
[security.enterprise]
enabled = true
log_level = "info"
content-service/fraiseql.toml
[project]
name = "content-service"
version = "1.0.0"
[database]
url = "${CONTENT_DATABASE_URL}"
[fraiseql]
schema_file = "schema.json"
output_file = "schema.compiled.json"
# OIDC config via environment variables — not in [security.pkce] TOML:
# OIDC_ISSUER_URL, OIDC_CLIENT_ID, OIDC_CLIENT_SECRET, OIDC_REDIRECT_URI
[security.pkce]
enabled = true
[federation]
enabled = true
service_name = "content"
subgraph_url = "http://content-service:8082/graphql"
[observers]
backend = "nats"
nats_url = "${NATS_URL}"
[[observers]]
table = "tb_project"
event = "INSERT"
subject = "analytics.project_created"
[[observers]]
table = "tb_task"
event = "INSERT"
subject = "analytics.task_created"
[security.enterprise]
enabled = true
log_level = "info"
-- ============================================================
-- Auth service database: organizations and members
-- ============================================================
CREATE TABLE tb_organization (
pk_org BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT now(),
subscription_tier TEXT NOT NULL DEFAULT 'free'
);
CREATE UNIQUE INDEX idx_tb_organization_id ON tb_organization(id);
CREATE TABLE tb_member (
pk_member BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE,
fk_org BIGINT NOT NULL REFERENCES tb_organization(pk_org),
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
role TEXT NOT NULL DEFAULT 'member',
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE UNIQUE INDEX idx_tb_member_id ON tb_member(id);
CREATE INDEX idx_tb_member_fk_org ON tb_member(fk_org);
-- ============================================================
-- Content service database: projects and tasks
-- ============================================================
CREATE TABLE tb_project (
pk_project BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE,
-- org_id from JWT injected as RLS context; no FK across service boundaries
org_id UUID NOT NULL,
name TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE UNIQUE INDEX idx_tb_project_id ON tb_project(id);
CREATE INDEX idx_tb_project_org_id ON tb_project(org_id);
CREATE TABLE tb_task (
pk_task BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE,
fk_project BIGINT NOT NULL REFERENCES tb_project(pk_project),
title TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'todo',
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE UNIQUE INDEX idx_tb_task_id ON tb_task(id);
CREATE INDEX idx_tb_task_fk_project ON tb_task(fk_project);
-- ============================================================
-- Read views (JSONB data column — pk_ never exposed)
-- ============================================================
CREATE VIEW v_organization AS
SELECT
o.id,
jsonb_build_object(
'id', o.id::text,
'identifier', o.identifier,
'name', o.name,
'subscription_tier', o.subscription_tier,
'created_at', o.created_at
) AS data
FROM tb_organization o;
CREATE VIEW v_member AS
SELECT
m.id,
jsonb_build_object(
'id', m.id::text,
'identifier', m.identifier,
'name', m.name,
'email', m.email,
'role', m.role,
'created_at', m.created_at
) AS data
FROM tb_member m;
CREATE VIEW v_project AS
SELECT
p.id,
jsonb_build_object(
'id', p.id::text,
'identifier', p.identifier,
'name', p.name,
'org_id', p.org_id::text,
'created_at', p.created_at
) AS data
FROM tb_project p;
CREATE VIEW v_task AS
SELECT
t.id,
jsonb_build_object(
'id', t.id::text,
'identifier', t.identifier,
'title', t.title,
'status', t.status,
'created_at', t.created_at
) AS data
FROM tb_task t;
-- ============================================================
-- Mutation functions
-- ============================================================
CREATE FUNCTION fn_create_organization(
p_identifier TEXT,
p_name TEXT
) RETURNS mutation_response LANGUAGE plpgsql AS $$
DECLARE
v_id UUID;
v_result mutation_response;
BEGIN
INSERT INTO tb_organization (identifier, name)
VALUES (p_identifier, p_name)
RETURNING id INTO v_id;
v_result.status := 'success';
v_result.entity_id := v_id;
v_result.entity_type := 'Organization';
RETURN v_result;
END;
$$;
CREATE FUNCTION fn_create_project(
p_identifier TEXT,
p_org_id UUID,
p_name TEXT
) RETURNS mutation_response LANGUAGE plpgsql AS $$
DECLARE
v_id UUID;
v_result mutation_response;
BEGIN
INSERT INTO tb_project (identifier, org_id, name)
VALUES (p_identifier, p_org_id, p_name)
RETURNING id INTO v_id;
v_result.status := 'success';
v_result.entity_id := v_id;
v_result.entity_type := 'Project';
RETURN v_result;
END;
$$;
CREATE FUNCTION fn_create_task(
p_identifier TEXT,
p_fk_project BIGINT,
p_title TEXT
) RETURNS mutation_response LANGUAGE plpgsql AS $$
DECLARE
v_id UUID;
v_result mutation_response;
BEGIN
INSERT INTO tb_task (identifier, fk_project, title)
VALUES (p_identifier, p_fk_project, p_title)
RETURNING id INTO v_id;
v_result.status := 'success';
v_result.entity_id := v_id;
v_result.entity_type := 'Task';
RETURN v_result;
END;
$$;
-- Enable RLS for tenant isolation in the content service
ALTER TABLE tb_project ENABLE ROW LEVEL SECURITY;
ALTER TABLE tb_task ENABLE ROW LEVEL SECURITY;
-- Projects visible only to the org extracted from JWT (set by FraiseQL Rust runtime)
CREATE POLICY project_org_isolation ON tb_project
USING (org_id = current_setting('app.org_id')::uuid);
-- Tasks inherit project isolation via JOIN — no separate policy needed if accessed via v_task
-- filtered by fk_project.
-- For organizations in auth service
ALTER TABLE tb_organization ENABLE ROW LEVEL SECURITY;
CREATE POLICY org_owner_only ON tb_organization
USING (id = current_setting('app.org_id')::uuid);
auth-service/schema.py
import fraiseql
from fraiseql.scalars import ID, DateTime
from typing import Annotated
@fraiseql.type
class Organization:
"""A tenant organization."""
id: ID
identifier: str
name: str
subscription_tier: str
created_at: DateTime
@fraiseql.type
class Member:
"""A member of an organization."""
id: ID
identifier: str
name: str
email: Annotated[str, fraiseql.field(requires_scope="read:Member.email", on_deny="mask")]
role: str
created_at: DateTime
@fraiseql.input
class CreateOrganizationInput:
identifier: str
name: str
@fraiseql.input
class CreateMemberInput:
identifier: str
name: str
email: str
role: str = "member"
@fraiseql.query(
sql_source="v_organization",
inject={"org_id": "jwt:org_id"},
)
def organizations(limit: int = 50) -> list[Organization]:
"""
Get organizations for the authenticated tenant.
org_id is injected from the JWT claim and applied as RLS context.
"""
pass
@fraiseql.query(
sql_source="v_member",
inject={"org_id": "jwt:org_id"},
)
def members(limit: int = 100) -> list[Member]:
"""Get members of the authenticated tenant's organization."""
pass
@fraiseql.mutation(
sql_source="fn_create_organization",
operation="CREATE",
inject={"org_id": "jwt:org_id"},
)
def create_organization(input: CreateOrganizationInput) -> Organization:
"""
Create a new organization.
Rust runtime calls fn_create_organization(); pg_notify fires;
observer publishes analytics.organization_created to NATS.
"""
pass
@fraiseql.mutation(
sql_source="fn_create_member",
operation="CREATE",
inject={"org_id": "jwt:org_id"},
)
def create_member(input: CreateMemberInput) -> Member:
"""Add a member to the authenticated organization."""
pass
fraiseql.export_schema("schema.json")
content-service/schema.py
import fraiseql
from fraiseql.scalars import ID, DateTime
@fraiseql.type
class Project:
"""A project belonging to an organization."""
id: ID
identifier: str
name: str
org_id: ID
created_at: DateTime
@fraiseql.type
class Task:
"""A task within a project."""
id: ID
identifier: str
title: str
status: str
created_at: DateTime
@fraiseql.input
class CreateProjectInput:
identifier: str
name: str
@fraiseql.input
class CreateTaskInput:
identifier: str
fk_project: int
title: str
@fraiseql.query(
sql_source="v_project",
inject={"org_id": "jwt:org_id"},
cache_ttl_seconds=30,
)
def projects(limit: int = 50) -> list[Project]:
"""
Get projects for the authenticated organization.
org_id injected from JWT; RLS policy enforces tenant isolation.
"""
pass
@fraiseql.query(
sql_source="v_task",
cache_ttl_seconds=30,
)
def tasks(fk_project: int | None = None, limit: int = 100) -> list[Task]:
"""Get tasks, optionally filtered by project."""
pass
@fraiseql.mutation(
sql_source="fn_create_project",
operation="CREATE",
inject={"org_id": "jwt:org_id"},
invalidates_views=["v_project"],
)
def create_project(input: CreateProjectInput) -> Project:
"""
Create a project for the authenticated organization.
Rust runtime calls fn_create_project(); pg_notify fires;
observer publishes analytics.project_created to NATS.
"""
pass
@fraiseql.mutation(
sql_source="fn_create_task",
operation="CREATE",
invalidates_views=["v_task"],
)
def create_task(input: CreateTaskInput) -> Task:
"""Create a task within a project."""
pass
@fraiseql.subscription(
entity_type="Task",
topic="analytics.task_created",
)
def task_created(fk_project: int | None = None) -> Task:
"""Subscribe to new task creation events in real time."""
pass
fraiseql.export_schema("schema.json")

Analytics Worker — External nats.py Service

Section titled “Analytics Worker — External nats.py Service”
analytics-worker/main.py
"""
Analytics sync worker.
Subscribes to NATS analytics.* events and records them in the shared analytics DB.
Standalone service — not a FraiseQL schema file.
"""
import asyncio
import json
import asyncpg
import nats
async def sync_analytics_event(msg):
"""Record analytics events from any service into the shared analytics DB."""
event_type = msg.subject
data = json.loads(msg.data)
conn = await asyncpg.connect(ANALYTICS_POSTGRES_URL)
try:
await conn.execute(
"""
INSERT INTO tb_analytics_event
(identifier, event_type, tenant_id, data, recorded_at)
VALUES (gen_random_uuid()::text, $1, $2, $3::jsonb, NOW())
""",
event_type,
data.get("tenant_id"),
json.dumps(data),
)
await msg.ack()
except Exception as exc:
print(f"[analytics-worker] ERROR {event_type}: {exc}")
await msg.nak(delay=5)
finally:
await conn.close()
async def main():
nc = await nats.connect(NATS_URL)
js = nc.jetstream()
await js.subscribe(
"analytics.>",
durable="analytics_sync",
cb=sync_analytics_event,
)
print("Analytics worker listening on analytics.>")
await asyncio.sleep(float("inf"))
if __name__ == "__main__":
import os
NATS_URL = os.environ["NATS_URL"]
ANALYTICS_POSTGRES_URL = os.environ["ANALYTICS_DATABASE_URL"]
asyncio.run(main())
-- For each tenant, create an isolated database in the content service
-- (database-per-tenant strategy)
CREATE DATABASE tenant_acme_corp;
CREATE USER tenant_acme_corp WITH PASSWORD '...';
GRANT CONNECT ON DATABASE tenant_acme_corp TO tenant_acme_corp;
-- Then run migrations for each tenant database:
-- fraiseql migrate --database $TENANT_DATABASE_URL
gateway/supergraph.yaml
federation_version: =2.0.0
subgraphs:
auth:
routing_url: http://auth-service:8081/graphql
schema:
subgraph_url: http://auth-service:8081/graphql
content:
routing_url: http://content-service:8082/graphql
schema:
subgraph_url: http://content-service:8082/graphql

Compose the supergraph:

Terminal window
rover supergraph compose --config gateway/supergraph.yaml > supergraph.graphql
  1. Set up environment:

    Terminal window
    cp .env.example .env
    # Edit .env with your database credentials and OIDC config
  2. Start infrastructure:

    Terminal window
    docker-compose up -d postgres nats
  3. Create tenant databases (for database-per-tenant strategy):

    Terminal window
    docker-compose exec postgres psql -U postgres -c \
    "CREATE DATABASE tenant_acme_corp;"
    docker-compose exec postgres psql -U postgres -c \
    "CREATE DATABASE tenant_global_corp;"
  4. Run migrations for each tenant:

    Terminal window
    export TENANT_DATABASE_URL=postgresql://postgres:password@localhost:5432/tenant_acme_corp
    fraiseql migrate
    export TENANT_DATABASE_URL=postgresql://postgres:password@localhost:5432/tenant_global_corp
    fraiseql migrate
  5. Start all FraiseQL services:

    Terminal window
    docker-compose up -d auth-service content-service billing-service analytics-worker

    Expected output from auth-service:

    FraiseQL v2.0.1 — auth-service
    GraphQL endpoint: http://localhost:8081/graphql
    Federation: enabled (subgraph: auth)
    NATS observers: connected (nats://nats:4222)
  6. Create organization as tenant A:

    Terminal window
    curl -X POST http://localhost:8080/graphql \
    -H "Content-Type: application/json" \
    -H "Authorization: Bearer $TOKEN_A" \
    -d '{
    "query": "mutation { createOrganization(input: { identifier: \"acme-corp\", name: \"Acme Corp\" }) { id name subscriptionTier } }"
    }'

    Expected response:

    {
    "data": {
    "createOrganization": {
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "name": "Acme Corp",
    "subscriptionTier": "free"
    }
    }
    }
  7. Verify tenant isolation (tenant B cannot see tenant A’s data):

    Terminal window
    curl -X POST http://localhost:8080/graphql \
    -H "Content-Type: application/json" \
    -H "Authorization: Bearer $TOKEN_B" \
    -d '{"query": "query { organizations { id name } }"}'

    Expected response (empty list due to RLS):

    {
    "data": {
    "organizations": []
    }
    }
  8. Create project and verify NATS event is published:

    Terminal window
    curl -X POST http://localhost:8080/graphql \
    -H "Content-Type: application/json" \
    -H "Authorization: Bearer $TOKEN_A" \
    -d '{
    "query": "mutation { createProject(input: { identifier: \"alpha\", name: \"Project Alpha\" }) { id name } }"
    }'

    Wait for the observer to publish to NATS and the analytics worker to consume:

    Terminal window
    sleep 1
  9. Verify event in analytics:

    Terminal window
    docker-compose exec postgres psql -U postgres -d shared_analytics -c \
    "SELECT event_type, tenant_id, recorded_at FROM tb_analytics_event ORDER BY recorded_at DESC LIMIT 5;"

    Expected output:

    event_type | tenant_id | recorded_at
    -------------------------+------------+----------------------------
    analytics.project_created | acme_corp | 2024-01-15 10:30:01+00
    analytics.org_created | acme_corp | 2024-01-15 10:30:00+00
  10. Run tests:

    Terminal window
    pytest tests/test_tenant_isolation.py -v
    pytest tests/test_nats_events.py -v

    Expected output:

    tests/test_tenant_isolation.py::test_tenant_isolation PASSED
    tests/test_tenant_isolation.py::test_cross_tenant_access_blocked PASSED
    tests/test_nats_events.py::test_project_creation_event PASSED
    tests/test_nats_events.py::test_analytics_sync PASSED
# Prometheus metrics exported by each FraiseQL service (labels by tenant from JWT claims)
fraiseql_graphql_requests_total{service="auth", operation="mutation"}
fraiseql_graphql_latency_seconds{service="content", operation="query"}
fraiseql_observer_events_published_total{service="content", subject="analytics.project_created"}
# Analytics worker metrics (custom, in the nats.py worker)
analytics_worker_events_processed_total{event_type="analytics.project_created"}
analytics_worker_consumer_lag{consumer="analytics_sync"}
Terminal window
Error: Database tenant_acme_corp does not exist

Solution:

Terminal window
docker-compose exec postgres psql -U postgres -c "CREATE DATABASE tenant_acme_corp;"
./scripts/create-tenant.sh acme_corp
Terminal window
Error: Cannot connect to NATS at nats://localhost:4222

Check:

  1. Verify NATS is running:

    Terminal window
    docker-compose ps nats
  2. Check [observers] section in fraiseql.toml:

    [observers]
    backend = "nats"
    nats_url = "nats://nats:4222" # Use service name, not localhost, in Docker
  3. Test NATS connection:

    Terminal window
    docker-compose exec nats nats server info

If the analytics table remains empty:

  1. Check FraiseQL observer log:

    Terminal window
    docker-compose logs content-service | grep "observer"
  2. Verify the NATS stream exists:

    Terminal window
    docker-compose exec nats nats stream report
  3. Check the analytics worker consumer:

    Terminal window
    docker-compose exec nats nats consumer report
    docker-compose logs analytics-worker | tail -20

If tenant B can see tenant A’s data:

  1. Verify RLS is enabled:

    \d tb_project
    -- Look for "Policies:" section
  2. Check the policy is active:

    SELECT * FROM pg_policies WHERE tablename = 'tb_project';
  3. Verify org_id JWT claim is being injected:

    Terminal window
    # Decode your JWT and verify it contains org_id claim
    echo $TOKEN_A | cut -d. -f2 | base64 -d | python3 -m json.tool

If cross-service queries fail at the gateway:

  1. Validate each subgraph is reachable:

    Terminal window
    curl http://localhost:8081/graphql -d '{"query": "{ __typename }"}'
    curl http://localhost:8082/graphql -d '{"query": "{ __typename }"}'
  2. Re-compose the supergraph if schema changed:

    Terminal window
    rover supergraph compose --config gateway/supergraph.yaml > supergraph.graphql
  3. Check gateway logs for composition errors:

    Terminal window
    docker-compose logs gateway | grep "error"

If queries are slow with multiple tenants:

  1. Check database connection pool sizes in each service’s fraiseql.toml.

  2. Enable connection pooling at the PostgreSQL level:

    ALTER SYSTEM SET max_connections = 200;
    SELECT pg_reload_conf();
  3. Monitor per-service metrics:

    Terminal window
    curl http://localhost:8081/metrics | grep fraiseql_graphql_latency
  • Database per tenant created and migrations applied
  • RLS policies enabled and tested on all tables
  • NATS cluster deployed with JetStream replication
  • [observers] configured and tested per service
  • Analytics worker running with durable consumer group
  • Monitoring and alerting configured per service
  • Backup strategy for each tenant database
  • Failover procedures documented
  • Load testing completed with concurrent tenants
  • Security audit: tenant isolation verified end-to-end

Advanced Federation

Deep dive on Apollo Federation v2 patterns.

Observers Reference

TOML observer configuration for NATS events.

JWT Injection

Using inject= to pass JWT claims to SQL functions.

Multi-Tenancy Guide

RLS and tenant isolation strategies.