Advanced Federation
Deep dive on federation patterns.
A complete multi-tenant SaaS application combining federation, NATS messaging, and custom resolvers for a production-ready system.
Tenant A ──┐ ├─► API Gateway (Auth + RBAC)Tenant B ──┘ | | +-----------+-----------+ | | | Tenant DB A Tenant DB B Master DB | | +──── NATS Events ──────+ | Analytics DB[multi_tenancy]enabled = truestrategy = "database_per_tenant"auth_header = "X-Tenant-ID"
# Tenant database pool[databases.tenant]type = "postgresql"pool_max = 20
# Shared services[databases.shared]type = "postgresql"url = "${SHARED_DB_URL}"pool_max = 10
[observers]enabled = truebackend = "nats"nats_url = "${NATS_URL}"-- Tenant databases (one per tenant)CREATE TABLE tb_organization ( pk_org SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, tenant_id UUID NOT NULL, name TEXT NOT NULL, created_at TIMESTAMPTZ DEFAULT now(), subscription_tier TEXT NOT NULL DEFAULT 'free');
CREATE TABLE tb_member ( pk_member SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, fk_org INTEGER NOT NULL REFERENCES tb_organization(pk_org), name TEXT NOT NULL, email TEXT NOT NULL, created_at TIMESTAMPTZ DEFAULT now());
CREATE TABLE tb_project ( pk_project SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, tenant_id UUID NOT NULL, fk_org INTEGER NOT NULL REFERENCES tb_organization(pk_org), name TEXT NOT NULL, created_at TIMESTAMPTZ DEFAULT now());
CREATE TABLE tb_task ( pk_task SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, fk_project INTEGER NOT NULL REFERENCES tb_project(pk_project), title TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'todo', created_at TIMESTAMPTZ DEFAULT now());
-- Views with JSONB data columnCREATE VIEW v_organization ASSELECT o.id, o.identifier, jsonb_build_object( 'id', o.id::text, 'identifier', o.identifier, 'name', o.name, 'subscription_tier', o.subscription_tier, 'created_at', o.created_at ) AS data, o.tenant_id, o.pk_orgFROM tb_organization o;
CREATE VIEW v_project ASSELECT p.id, p.identifier, jsonb_build_object( 'id', p.id::text, 'identifier', p.identifier, 'name', p.name, 'org_id', o.id::text, 'created_at', p.created_at ) AS data, p.tenant_id, p.fk_orgFROM tb_project pJOIN tb_organization o ON o.pk_org = p.fk_org;import fraiseqlfrom fraiseql.scalars import IDfrom fraiseql import TenantContextfrom datetime import datetime
@fraiseql.type(database="tenant")class Organization: """Tenant-specific organization.""" id: ID name: str created_at: datetime subscription_tier: str
@fraiseql.field_resolver async def member_count(self, ctx: TenantContext) -> int: """Count organization members.""" result = await ctx.db.query_one( """ SELECT COUNT(*) as count FROM tb_member WHERE fk_org = (SELECT pk_org FROM tb_organization WHERE id = $1) """, [self.id] ) return result.get("count", 0)
@fraiseql.type(database="tenant")class Project: """Tenant-specific project.""" id: ID name: str created_at: datetime
# Multi-database federation organization: Organization = fraiseql.federated( database="tenant", lookup="fk_org" )
@fraiseql.field_resolver async def task_count(self, ctx: TenantContext) -> int: """Count tasks in project.""" result = await ctx.db.query_one( """ SELECT COUNT(*) as count FROM tb_task WHERE fk_project = (SELECT pk_project FROM tb_project WHERE id = $1) """, [self.id] ) return result["count"]
# Root queries@fraiseql.query(sql_source="v_organization")async def organizations(limit: int = 50) -> list[Organization]: """ Get all organizations for tenant. Automatically filtered by tenant_id from context (RLS). """ pass
@fraiseql.query(sql_source="v_project")async def projects_by_org(org_id: ID) -> list[Project]: """Get projects for organization (with RLS).""" pass
# Mutations@fraiseql.mutation(operation="CREATE")async def create_project( ctx: TenantContext, org_id: ID, name: str) -> Project: """Create project and emit event.""" # Verify ownership (RLS ensures tenant isolation) org = await ctx.db.query_one( """ SELECT pk_org FROM tb_organization WHERE id = $1 AND tenant_id = $2 """, [org_id, ctx.tenant_id] ) if not org: raise Exception("Organization not found")
project = await ctx.db.query_one( """ INSERT INTO tb_project (tenant_id, fk_org, name, identifier) VALUES ($1, $2, $3, lower(regexp_replace($3, '[^a-zA-Z0-9]+', '-', 'g'))) RETURNING id, identifier, name, created_at """, [ctx.tenant_id, org['pk_org'], name] )
return Project(**project)
# Observers for event-driven features@fraiseql.observer( entity="Project", event="CREATE", database="tenant")async def on_project_created_event(project: Project, ctx: TenantContext): """ React to project creation: - Notify shared analytics - Update usage metrics """ await fraiseql.nats.publish( subject="analytics.project_created", data={ "tenant_id": str(ctx.tenant_id), "project_id": str(project.id), "timestamp": datetime.now().isoformat() } )
@fraiseql.nats.subscribe( subject="analytics.>", consumer_group="analytics_sync")async def sync_analytics(message): """Sync analytics events to shared database.""" data = message.data await ctx.db.execute( """ INSERT INTO tb_analytics_event (event_type, tenant_id, data, recorded_at) VALUES ($1, $2, $3::jsonb, NOW()) """, [data["event_type"], data["tenant_id"], json.dumps(data)] ) await message.ack()-- For each tenant, create isolated databaseCREATE DATABASE tenant_acme_corp;CREATE USER tenant_acme_corp WITH PASSWORD '...';GRANT CONNECT ON DATABASE tenant_acme_corp TO tenant_acme_corp;
-- Enable RLS for extra safetyALTER TABLE tb_organization ENABLE ROW LEVEL SECURITY;ALTER TABLE tb_project ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON tb_organizationUSING (tenant_id = current_setting('app.tenant_id')::uuid);
CREATE POLICY project_tenant_isolation ON tb_projectUSING (tenant_id = current_setting('app.tenant_id')::uuid);# Cache tenant configurations@fraiseql.query(sql_source="v_tenant_config", cache_ttl=3600)async def tenant_config(ctx: TenantContext) -> dict: """Frequently accessed configuration.""" pass
# Batch operations for multi-tenant deployments@fraiseql.mutation(operation="CREATE")async def bulk_create_projects( ctx: TenantContext, org_id: ID, names: list[str]) -> list[Project]: """Batch create projects efficiently.""" org = await ctx.db.query_one( "SELECT pk_org FROM tb_organization WHERE id = $1 AND tenant_id = $2", [org_id, ctx.tenant_id] ) if not org: raise Exception("Organization not found")
projects = [] for name in names: project = await ctx.db.query_one( """ INSERT INTO tb_project (tenant_id, fk_org, name, identifier) VALUES ($1, $2, $3, lower(regexp_replace($3, '[^a-zA-Z0-9]+', '-', 'g'))) RETURNING id, identifier, name, created_at """, [ctx.tenant_id, org['pk_org'], name] ) projects.append(Project(**project))
return projectsimport pytestfrom fraiseql.testing import FraiseQLTestClient
@pytest.fixtureasync def client(): return await FraiseQLTestClient.create( config="fraiseql.test.toml" )
@pytest.mark.asyncioasync def test_tenant_isolation(client): """Verify tenants cannot access each other's data.""" # Create as tenant A client.set_tenant("tenant-a") org_a = await client.execute(""" mutation { createOrganization(name: "Org A") { id } } """)
# Try to access as tenant B client.set_tenant("tenant-b") result = await client.execute(f""" query {{ organization(id: "{org_a['data']['createOrganization']['id']}") {{ name }} }} """)
# Should return null due to RLS assert result["data"]["organization"] is None
@pytest.mark.asyncioasync def test_nats_event_publishing(client): """Verify events are published on mutations.""" client.set_tenant("tenant-a")
project = await client.execute(""" mutation { createProject(orgId: "org-1", name: "Test") { id } } """)
# Give NATS time to process await asyncio.sleep(0.5)
# Verify event was published events = await get_published_events("analytics.project_created") assert len(events) >= 1# Track per-tenant metrics (Prometheus labels)fraiseql_tenant_requests_total{tenant_id="acme_corp", endpoint="/graphql"}fraiseql_tenant_latency_seconds{tenant_id="acme_corp", operation="query"}fraiseql_tenant_errors_total{tenant_id="acme_corp", error_type="validation"}
# Monitor event processingfraiseql_nats_events_processed{tenant_id="acme_corp", event_type="project_created"}fraiseql_nats_consumer_lag{consumer="analytics_sync"}Set up environment:
cp .env.example .env# Edit .env with your database credentialsStart services:
docker-compose up -d postgres natsCreate tenant databases (for database-per-tenant strategy):
docker-compose exec postgres psql -U postgres -c \ "CREATE DATABASE tenant_acme_corp;"docker-compose exec postgres psql -U postgres -c \ "CREATE DATABASE tenant_global_corp;"Run migrations for each tenant:
export TENANT_ID=acme_corpconfiture migrate
export TENANT_ID=global_corpconfiture migrateStart FraiseQL server:
fraiseql runExpected output:
🍓 FraiseQL v0.9.0 GraphQL endpoint: http://localhost:8080/graphql Multi-tenancy: database_per_tenant NATS: connected Tenants: 2 configuredCreate organization as tenant A:
curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -H "X-Tenant-ID: acme_corp" \ -H "Authorization: Bearer $TOKEN_A" \ -d '{ "query": "mutation { createOrganization(name: \"Acme Corp\") { id name subscriptionTier } }" }'Expected response:
{ "data": { "createOrganization": { "id": "org-abc123", "name": "Acme Corp", "subscriptionTier": "free" } }}Verify tenant isolation (tenant B cannot see tenant A’s data):
curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -H "X-Tenant-ID: global_corp" \ -H "Authorization: Bearer $TOKEN_B" \ -d '{ "query": "query { organizations { id name } }" }'Expected response (empty list):
{ "data": { "organizations": [] }}Create project and verify event is published:
curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -H "X-Tenant-ID: acme_corp" \ -H "Authorization: Bearer $TOKEN_A" \ -d '{ "query": "mutation { createProject(orgId: \"org-abc123\", name: \"Project Alpha\") { id name taskCount } }" }'Wait for NATS event:
sleep 1Verify event in analytics:
docker-compose exec postgres psql -U postgres -d shared_analytics -c \ "SELECT * FROM tb_analytics_event WHERE tenant_id = 'acme_corp';"Expected output:
event_type | tenant_id | data | recorded_at---------------------+--------------+------------------------+--------------analytics.project_created | acme_corp | {"project_id": ...} | 2024-01-15...Run tests:
pytest tests/test_tenant_isolation.py -vpytest tests/test_nats_events.py -vExpected output:
tests/test_tenant_isolation.py::test_tenant_isolation PASSEDtests/test_tenant_isolation.py::test_cross_tenant_access_blocked PASSEDtests/test_nats_events.py::test_project_creation_event PASSEDtests/test_nats_events.py::test_analytics_sync PASSEDError: Database tenant_acme_corp does not existSolution:
# Create database manuallydocker-compose exec postgres psql -U postgres -c "CREATE DATABASE tenant_acme_corp;"
# Or use the tenant provisioning script./scripts/create-tenant.sh acme_corpError: Cannot connect to NATS at nats://localhost:4222Check:
Verify NATS is running:
docker-compose ps natsCheck NATS URL in configuration:
[nats]servers = ["nats://nats:4222"] # Use service name, not localhost in DockerTest NATS connection:
docker-compose exec nats nats server infoIf analytics table remains empty:
Check NATS consumer is running:
docker-compose logs fraiseql | grep "analytics_sync"Verify stream exists:
docker-compose exec nats nats stream reportCheck consumer group:
docker-compose exec nats nats consumer reportIf tenant B can see tenant A’s data:
Verify RLS is enabled:
\d tb_organization-- Look for "Policies:"Check policy is active:
SELECT * FROM pg_policies WHERE tablename = 'tb_organization';Verify tenant_id is being set:
# Check HTTP headers are being passedcurl -v http://localhost:8080/graphql -H "X-Tenant-ID: acme_corp" ...If queries are slow with multiple tenants:
Check database connection pool:
[databases.tenant]pool_max = 50 # Increase if neededEnable connection pooling at PostgreSQL level:
ALTER SYSTEM SET max_connections = 200;Monitor per-tenant metrics:
curl http://localhost:8080/metrics | grep fraiseql_tenantIf cross-database queries fail:
Verify all databases are accessible:
fraiseql config validateCheck foreign data wrappers (if using PostgreSQL FDW):
\des -- List foreign serversTest individual database connections:
psql $DATABASE_URL_TENANT_A -c "SELECT 1"psql $DATABASE_URL_TENANT_B -c "SELECT 1"If tenant isolation tests fail:
Clear test data:
docker-compose exec postgres psql -U postgres -d tenant_acme_corp -c \ "TRUNCATE tb_organization, tb_project, tb_member CASCADE;"Ensure test client sets tenant correctly:
client.set_tenant("tenant-a") # Must be called before each testCheck test database is separate from dev database
Advanced Federation
Deep dive on federation patterns.
Advanced NATS
Event streaming advanced patterns.
Custom Resolvers
Business logic integration.
Multi-Tenancy Guide
RLS and tenant isolation strategies.