Skip to content

Advanced Patterns

Master advanced FraiseQL patterns for production applications.

Implement undo/restore functionality without losing data.

from datetime import datetime
from fraiseql import FraiseQL
@fraiseql.type
class Post:
id: ID
title: str
content: str
created_at: datetime
deleted_at: datetime | None # NULL = not deleted
@fraiseql.query
def posts() -> list[Post]:
"""Get all non-deleted posts."""
# FraiseQL automatically filters WHERE deleted_at IS NULL
pass
@fraiseql.query
def posts_including_deleted() -> list[Post]:
"""Get all posts including deleted ones (admin only)."""
pass
@fraiseql.mutation
def delete_post(id: ID) -> Post:
"""Soft delete: set deleted_at timestamp."""
# UPDATE posts SET deleted_at = NOW() WHERE id = ?
pass
@fraiseql.mutation
def restore_post(id: ID) -> Post:
"""Restore deleted post."""
# UPDATE posts SET deleted_at = NULL WHERE id = ?
pass
CREATE TABLE posts (
id BIGSERIAL PRIMARY KEY,
title TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
deleted_at TIMESTAMP -- NULL = not deleted
);
-- Create index for faster queries
CREATE INDEX idx_posts_deleted_at ON posts(deleted_at)
WHERE deleted_at IS NULL;
-- View for non-deleted posts
CREATE VIEW v_posts_active AS
SELECT * FROM posts WHERE deleted_at IS NULL;

Delete parent and children (soft):

@fraiseql.mutation
def delete_post_with_comments(post_id: ID) -> Post:
"""Delete post and all its comments."""
# In database with triggers:
# 1. UPDATE posts SET deleted_at = NOW() WHERE id = ?
# 2. UPDATE comments SET deleted_at = NOW() WHERE post_id = ?
pass

Database trigger (PostgreSQL):

CREATE TRIGGER cascade_post_delete
AFTER UPDATE ON posts
FOR EACH ROW
WHEN (NEW.deleted_at IS NOT NULL AND OLD.deleted_at IS NULL)
BEGIN
UPDATE comments SET deleted_at = NOW()
WHERE post_id = NEW.id AND deleted_at IS NULL;
END;

Track all changes for compliance and debugging.

from datetime import datetime
from enum import Enum
class AuditAction(str, Enum):
CREATE = "CREATE"
UPDATE = "UPDATE"
DELETE = "DELETE"
RESTORE = "RESTORE"
@fraiseql.type
class AuditLog:
id: ID
entity_type: str # "Post", "User", etc.
entity_id: ID
action: AuditAction
old_values: dict # JSON of previous values
new_values: dict # JSON of current values
user_id: ID
timestamp: datetime
@fraiseql.query
def audit_logs(entity_type: str, entity_id: ID) -> list[AuditLog]:
"""Get audit trail for entity."""
pass
@fraiseql.mutation
def update_post(id: ID, title: str, content: str) -> Post:
"""Update post with audit logging."""
# 1. Get old values
# 2. Update post
# 3. Log change: { entity_type: "Post", entity_id: id, action: "UPDATE", old_values: {...}, new_values: {...} }
pass
CREATE TABLE audit_logs (
id BIGSERIAL PRIMARY KEY,
entity_type VARCHAR(50) NOT NULL,
entity_id BIGINT NOT NULL,
action VARCHAR(20) NOT NULL,
old_values JSONB,
new_values JSONB,
user_id BIGINT NOT NULL,
timestamp TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_audit_logs_entity ON audit_logs(entity_type, entity_id);
CREATE INDEX idx_audit_logs_user ON audit_logs(user_id);
CREATE INDEX idx_audit_logs_timestamp ON audit_logs(timestamp DESC);
-- View for audit timeline
CREATE VIEW v_audit_timeline AS
SELECT
id, entity_type, entity_id, action,
old_values, new_values, user_id, timestamp
FROM audit_logs
ORDER BY timestamp DESC;
-- Function to log post changes
CREATE OR REPLACE FUNCTION log_post_changes()
RETURNS TRIGGER AS '
BEGIN
INSERT INTO audit_logs (entity_type, entity_id, action, old_values, new_values, user_id, timestamp)
VALUES (
''Post'',
NEW.id,
CASE
WHEN TG_OP = ''INSERT'' THEN ''CREATE''
WHEN TG_OP = ''UPDATE'' THEN ''UPDATE''
WHEN TG_OP = ''DELETE'' THEN ''DELETE''
END,
CASE
WHEN TG_OP = ''INSERT'' THEN NULL
ELSE row_to_json(OLD)
END,
CASE
WHEN TG_OP = ''DELETE'' THEN NULL
ELSE row_to_json(NEW)
END,
COALESCE(current_setting(''app.user_id'')::BIGINT, NULL),
NOW()
);
RETURN NEW;
END;
' LANGUAGE plpgsql;
-- Trigger on posts table
CREATE TRIGGER posts_audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON posts
FOR EACH ROW
EXECUTE FUNCTION log_post_changes();

Pass the user ID from the JWT into your mutation functions using inject=, then set the PostgreSQL session variable inside the SQL function:

@fraiseql.mutation(
sql_source="fn_update_post",
operation="UPDATE",
inject={"user_id": "jwt:sub"} # JWT 'sub' is passed as SQL param
)
def update_post(id: ID, title: str, content: str) -> Post:
"""Update post. fn_update_post sets app.user_id for audit triggers."""
pass

Track content versions and allow rollback.

@fraiseql.type
class PostVersion:
id: ID
post_id: ID
version_number: int
title: str
content: str
created_by: ID
created_at: datetime
@fraiseql.type
class Post:
id: ID
title: str
content: str
current_version: int
versions: list[PostVersion]
created_at: datetime
updated_at: datetime
@fraiseql.query
def post_versions(post_id: ID) -> list[PostVersion]:
"""Get all versions of a post."""
pass
@fraiseql.mutation
def update_post(id: ID, title: str, content: str) -> Post:
"""Update post (creates new version)."""
# 1. Get current version_number
# 2. Create new version record
# 3. Update post with new content
pass
@fraiseql.mutation
def revert_to_version(post_id: ID, version_number: int) -> Post:
"""Revert to specific version."""
# 1. Get version content
# 2. Create new version with old content
# 3. Update post
pass
CREATE TABLE posts (
id BIGSERIAL PRIMARY KEY,
title TEXT NOT NULL,
content TEXT NOT NULL,
current_version INT DEFAULT 1,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE post_versions (
id BIGSERIAL PRIMARY KEY,
post_id BIGINT NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
version_number INT NOT NULL,
title TEXT NOT NULL,
content TEXT NOT NULL,
created_by BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(post_id, version_number)
);
CREATE INDEX idx_post_versions_post_id ON post_versions(post_id);
CREATE INDEX idx_post_versions_created_at ON post_versions(created_at DESC);

Efficient pagination for large datasets.

@fraiseql.type
class PageInfo:
has_next_page: bool
has_prev_page: bool
start_cursor: str | None
end_cursor: str | None
@fraiseql.type
class PostEdge:
cursor: str
node: Post
@fraiseql.type
class PostConnection:
edges: list[PostEdge]
pageInfo: PageInfo
total_count: int
@fraiseql.query
def posts(
first: int = 10,
after: str | None = None
) -> PostConnection:
"""Cursor-based pagination."""
# Cursor = base64(id=123)
# Query: SELECT * FROM posts WHERE id > 123 LIMIT 11
# Check if 11 rows returned to know if has_next_page
pass
-- GOOD: Use keyset pagination (efficient)
SELECT * FROM posts
WHERE id > ? AND created_at > ?
ORDER BY created_at DESC
LIMIT 10;
-- BAD: Avoid OFFSET (scans all rows)
SELECT * FROM posts
ORDER BY created_at DESC
LIMIT 10 OFFSET 1000000; -- Scans 1,000,010 rows!
import base64
def encode_cursor(post_id: int, created_at: datetime) -> str:
"""Encode cursor for pagination."""
data = f"{post_id}:{created_at.isoformat()}"
return base64.b64encode(data.encode()).decode()
def decode_cursor(cursor: str) -> tuple[int, datetime]:
"""Decode cursor."""
data = base64.b64decode(cursor.encode()).decode()
post_id, created_at = data.split(":")
return int(post_id), datetime.fromisoformat(created_at)

Fine-grained permission management.

from enum import Enum
class Role(str, Enum):
ADMIN = "admin"
EDITOR = "editor"
VIEWER = "viewer"
class Permission(str, Enum):
READ_POST = "read:post"
CREATE_POST = "create:post"
UPDATE_POST = "update:post"
DELETE_POST = "delete:post"
MANAGE_USERS = "manage:users"
CREATE TABLE roles (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(50) NOT NULL UNIQUE,
description TEXT
);
CREATE TABLE permissions (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE,
description TEXT
);
CREATE TABLE role_permissions (
role_id BIGINT NOT NULL REFERENCES roles(id) ON DELETE CASCADE,
permission_id BIGINT NOT NULL REFERENCES permissions(id) ON DELETE CASCADE,
PRIMARY KEY (role_id, permission_id)
);
CREATE TABLE user_roles (
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role_id BIGINT NOT NULL REFERENCES roles(id) ON DELETE CASCADE,
PRIMARY KEY (user_id, role_id)
);
-- View: User permissions
CREATE VIEW v_user_permissions AS
SELECT DISTINCT
u.id AS user_id,
p.name AS permission
FROM users u
JOIN user_roles ur ON u.id = ur.user_id
JOIN role_permissions rp ON ur.role_id = rp.role_id
JOIN permissions p ON rp.permission_id = p.id;
@fraiseql.query(requires_scope="read:post")
def posts() -> list[Post]:
"""User must have read:post permission."""
pass
@fraiseql.mutation(requires_scope="create:post")
def create_post(title: str, content: str) -> Post:
"""User must have create:post permission."""
pass

Scope enforcement is declared on each query/mutation via requires_scope=. FraiseQL validates the JWT scopes at the Rust runtime level before the query reaches the database — no Python permission check is needed.

Isolate data between customers.

@fraiseql.type
class Post:
id: ID
tenant_id: ID
title: str
content: str
# FraiseQL enforces: tenant_id = current_user.tenant_id
@fraiseql.query
def posts() -> list[Post]:
"""Get posts for current tenant only."""
# Automatically filters WHERE tenant_id = current_tenant_id
pass

PostgreSQL RLS:

CREATE POLICY posts_tenant_isolation ON posts
USING (tenant_id = current_setting('app.tenant_id')::BIGINT)
WITH CHECK (tenant_id = current_setting('app.tenant_id')::BIGINT);
ALTER TABLE posts ENABLE ROW LEVEL SECURITY;
# Configure each tenant's database in fraiseql.toml.
# Use inject= to route by tenant from JWT claims:
```toml
[databases.tenant_primary]
url = "${TENANT_PRIMARY_DB_URL}"
[databases.tenant_secondary]
url = "${TENANT_SECONDARY_DB_URL}"
@fraiseql.query(sql_source="v_post", inject={"tenant_id": "jwt:tenant_id"})
def posts() -> list[Post]:
"""FraiseQL passes tenant_id from JWT to the view's RLS policy."""
pass
## Nested Document Pattern
Denormalize hierarchical data for performance.
```python
@fraiseql.type
class Comment:
id: ID
text: str
author: str
created_at: datetime
@fraiseql.type
class Post:
id: ID
title: str
content: str
comments: list[Comment] # Denormalized
@fraiseql.query
def post(id: ID) -> Post:
"""Get post with nested comments."""
# Single query with JOINs or array aggregation
# SELECT p.*, array_agg(c.*) as comments FROM posts p ...
pass

When to use:

  • Comments on a post
  • Replies to a comment
  • Attachments on a document
  • Small nested collections (< 100 items)

When NOT to use:

  • Large collections (> 10k items)
  • Frequently updated sub-items
  • Need independent sorting/filtering

Prevent expensive queries:

@fraiseql.query
def users() -> list[User]:
"""Limit nested queries."""
# Complexity: 1 (user list) + 10 (posts per user) + 10 (comments per post)
# = 1 + 10*10 + 10*10*10 = 1111
# Too high! Set limit.
pass
@fraiseql.type
class User:
id: ID
name: str
posts: list[Post] # Automatically batched
@fraiseql.query
@cached(ttl=3600)
def user_stats(user_id: ID) -> UserStats:
"""Cache expensive aggregation."""
# SELECT COUNT(*) as posts, SUM(...) as likes, etc.
# Executed once per hour max
pass

FraiseQL decorator function bodies are never executed — they are compile-time schema declarations only. Input validation and error handling belong in the PostgreSQL function:

@fraiseql.mutation(sql_source="fn_create_post", operation="CREATE")
def create_post(title: str, content: str) -> Post:
"""Create a new post. Validation is enforced by fn_create_post."""
pass
db/schema/03_functions/fn_create_post.sql
CREATE OR REPLACE FUNCTION fn_create_post(p_title TEXT, p_content TEXT)
RETURNS SETOF v_post LANGUAGE plpgsql AS $$
BEGIN
IF length(p_title) < 3 THEN
RAISE EXCEPTION 'Title too short'
USING ERRCODE = 'P0001', HINT = 'INVALID_INPUT';
END IF;
IF length(p_content) < 10 THEN
RAISE EXCEPTION 'Content too short'
USING ERRCODE = 'P0002', HINT = 'INVALID_INPUT';
END IF;
-- ... insert and return
END;
$$;

Error codes are set via the HINT clause of RAISE EXCEPTION. FraiseQL maps this to extensions.code in the GraphQL error response. See Error Handling for the full error code reference.

import pytest
@pytest.fixture
def user():
return create_user(name="Test User")
@pytest.fixture
def post(user):
return create_post(user_id=user.id, title="Test")
def test_soft_delete(post):
"""Test soft delete."""
delete_post(post.id)
assert get_post(post.id) is None # Filtered out
assert get_post(post.id, include_deleted=True) is not None
def test_audit_logging(post):
"""Test audit trail."""
update_post(post.id, title="Updated")
logs = get_audit_logs("Post", post.id)
assert len(logs) == 2 # CREATE + UPDATE
assert logs[1].action == "UPDATE"
assert logs[1].old_values["title"] == "Test"
assert logs[1].new_values["title"] == "Updated"
def test_versioning(post):
"""Test post versioning."""
update_post(post.id, title="V2")
update_post(post.id, title="V3")
versions = get_post_versions(post.id)
assert len(versions) == 3
assert versions[2].version_number == 3
PatternUse CaseComplexityPerformance
Soft DeleteUndo/restoreLowHigh
Audit TrailComplianceMediumMedium
VersioningContent historyMediumMedium
Cursor PaginationLarge datasetsLowHigh
RBACFine-grained permissionsMediumMedium
Multi-tenancyData isolationHighHigh
Nested DocumentsSmall nested dataLowHigh
CachingExpensive queriesLowHigh

Choose patterns based on your requirements, not “because everyone does it.”



  1. Choose relevant patterns for your use case from the summary table
  2. Implement one pattern at a time - don’t mix too many patterns initially
  3. Test thoroughly - advanced patterns need comprehensive test coverage
  4. Monitor performance - ensure patterns don’t create unexpected overhead
  5. Document your patterns - help other developers understand your choices
  6. Share and iterate - get feedback from team, refine as needed