Real-Time Features
Complete subscriptions reference.
A complete example of a Google Docs-like collaborative editor using FraiseQL and NATS event streaming.
Repository: github.com/fraiseql/examples/realtime-collaboration
CREATE TABLE tb_document ( pk_document SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, title TEXT NOT NULL, content TEXT NOT NULL DEFAULT '', fk_owner INTEGER NOT NULL REFERENCES tb_user(pk_user), created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now());
CREATE TABLE tb_document_share ( pk_share SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, fk_document INTEGER NOT NULL REFERENCES tb_document(pk_document), fk_user INTEGER NOT NULL REFERENCES tb_user(pk_user), permission TEXT NOT NULL DEFAULT 'view', -- 'view', 'edit', 'admin' shared_at TIMESTAMPTZ DEFAULT now(), UNIQUE(fk_document, fk_user));
CREATE TABLE tb_document_edit ( pk_edit SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, fk_document INTEGER NOT NULL REFERENCES tb_document(pk_document), fk_user INTEGER NOT NULL REFERENCES tb_user(pk_user), operation JSONB NOT NULL, -- { type: "insert", position: 100, content: "text" } timestamp TIMESTAMPTZ DEFAULT now());
-- Views with JSONB data columnCREATE VIEW v_document ASSELECT d.id, d.identifier, jsonb_build_object( 'id', d.id::text, 'identifier', d.identifier, 'title', d.title, 'content', d.content, 'owner_id', u.id::text, 'created_at', d.created_at, 'updated_at', d.updated_at ) AS dataFROM tb_document dJOIN tb_user u ON u.pk_user = d.fk_owner;
CREATE VIEW v_document_edit ASSELECT e.id, jsonb_build_object( 'id', e.id::text, 'document_id', d.id::text, 'user_id', u.id::text, 'operation', e.operation, 'timestamp', e.timestamp ) AS data, e.fk_documentFROM tb_document_edit eJOIN tb_document d ON d.pk_document = e.fk_documentJOIN tb_user u ON u.pk_user = e.fk_user;import fraiseqlfrom fraiseql.scalars import IDfrom datetime import datetime
@fraiseql.typeclass Document: id: ID title: str content: str owner_id: ID created_at: datetime updated_at: datetime
@fraiseql.typeclass DocumentShare: id: ID document_id: ID user_id: ID permission: str # "view", "edit", "admin" shared_at: datetime
@fraiseql.typeclass DocumentEdit: id: ID document_id: ID user_id: ID operation: dict # { type: "insert", position: 100, content: "text" } timestamp: datetime
@fraiseql.typeclass DocumentPresence: user_id: ID cursor_position: int selection_start: int selection_end: int# Subscribe to document changessubscription { documentChanged(documentId: "doc123") { id content updatedAt }}
# Subscribe to presence (who's editing)subscription { presenceUpdated(documentId: "doc123") { userId cursorPosition user { name email } }}
# Subscribe to commentssubscription { commentAdded(documentId: "doc123") { id content author { name } createdAt }}# Apply an edit operationmutation { applyEdit( documentId: "doc123" operation: { type: "insert" position: 150 content: "new text" } ) { id operation timestamp }}
# Update cursor/selection positionmutation { updatePresence( documentId: "doc123" cursorPosition: 250 selectionStart: 200 selectionEnd: 300 ) { userId cursorPosition }}
# Add comment to document rangemutation { addComment( documentId: "doc123" startPosition: 100 endPosition: 150 content: "This needs clarification" ) { id content author { name } }}from operational_transform import apply_operation, transform_operations
@fraiseql.mutationasync def apply_edit(info, document_id: str, operation: dict) -> DocumentEdit: """Apply edit with automatic conflict resolution.""" user_id = get_current_user(info)
# Get current document and pending operations doc = await ctx.db.query_one( "SELECT pk_document, content FROM tb_document WHERE id = $1", [document_id] )
pending_ops = await ctx.db.query( """ SELECT operation FROM tb_document_edit WHERE fk_document = $1 AND timestamp > $2 ORDER BY timestamp ASC """, [doc['pk_document'], operation['timestamp']] )
# Transform operation against pending operations transformed_op = operation for pending_op in pending_ops: transformed_op = transform_operations( transformed_op, pending_op['operation'] )
# Apply transformed operation to document new_content = apply_operation(doc['content'], transformed_op)
# Save operation and updated document await ctx.db.execute( """ UPDATE tb_document SET content = $1, updated_at = NOW() WHERE pk_document = $2 """, [new_content, doc['pk_document']] )
edit = await ctx.db.query_one( """ INSERT INTO tb_document_edit (fk_document, fk_user, operation) VALUES ($1, (SELECT pk_user FROM tb_user WHERE id = $2), $3::jsonb) RETURNING id, operation, timestamp """, [doc['pk_document'], user_id, json.dumps(transformed_op)] )
# Broadcast to other connected users await fraiseql.nats.publish( subject=f"documents:changed:{document_id}", data={ 'document_id': document_id, 'operation': transformed_op, 'user_id': user_id, 'content': new_content } )
return DocumentEdit(**edit)@fraiseql.subscriptionasync def document_changed(info, document_id: str): """Subscribe to document changes.""" user_id = get_current_user(info)
# Check permission share = await ctx.db.query_one( """ SELECT permission FROM tb_document_share WHERE fk_document = (SELECT pk_document FROM tb_document WHERE id = $1) AND fk_user = (SELECT pk_user FROM tb_user WHERE id = $2) """, [document_id, user_id] )
if not share: raise PermissionError("You don't have access to this document")
async for event in fraiseql.nats.subscribe(f'documents:changed:{document_id}'): yield { 'id': event['document_id'], 'content': event['content'], 'updatedAt': datetime.utcnow() }@fraiseql.mutationasync def update_presence( info, document_id: str, cursor_position: int, selection_start: int = 0, selection_end: int = 0) -> DocumentPresence: """Update user's cursor/selection and broadcast.""" user_id = get_current_user(info)
# Store presence in Redis (ephemeral, expires after 1 hour) await redis.setex( f"presence:{document_id}:{user_id}", 3600, json.dumps({ 'cursor_position': cursor_position, 'selection_start': selection_start, 'selection_end': selection_end, 'updated_at': datetime.utcnow().isoformat() }) )
# Broadcast presence update await fraiseql.nats.publish( subject=f"documents:presence:{document_id}", data={ 'user_id': user_id, 'cursor_position': cursor_position, 'selection_start': selection_start, 'selection_end': selection_end } )
return DocumentPresence( user_id=user_id, cursor_position=cursor_position, selection_start=selection_start, selection_end=selection_end )
@fraiseql.subscriptionasync def presence_updated(info, document_id: str): """Subscribe to presence updates (cursors, selections).""" async for event in fraiseql.nats.subscribe(f'documents:presence:{document_id}'): yield { 'userId': event['user_id'], 'cursorPosition': event['cursor_position'], 'selectionStart': event['selection_start'], 'selectionEnd': event['selection_end'] }@fraiseql.mutationasync def share_document( info, document_id: str, user_id: str, permission: str) -> DocumentShare: """Share document with another user.""" current_user_id = get_current_user(info)
# Verify ownership doc = await ctx.db.query_one( """ SELECT pk_document FROM tb_document WHERE id = $1 AND fk_owner = (SELECT pk_user FROM tb_user WHERE id = $2) """, [document_id, current_user_id] ) if not doc: raise PermissionError("You can't share documents you don't own")
share = await ctx.db.query_one( """ INSERT INTO tb_document_share (fk_document, fk_user, permission) VALUES ( $1, (SELECT pk_user FROM tb_user WHERE id = $2), $3 ) ON CONFLICT (fk_document, fk_user) DO UPDATE SET permission = EXCLUDED.permission RETURNING id, permission, shared_at """, [doc['pk_document'], user_id, permission] )
# Notify recipient await fraiseql.nats.publish( subject="share:received", data={ 'document_id': document_id, 'user_id': user_id, 'permission': permission } )
return DocumentShare(**share)@fraiseql.query(sql_source="v_document_edit")def document_history(document_id: str, first: int = 50) -> list[DocumentEdit]: """Get edit history for a document in chronological order.""" passClone the example
git clone https://github.com/fraiseql/examples/realtime-collaborationcd realtime-collaborationSet up environment
cp .env.example .envuv syncStart services
docker-compose up -d postgres redis natsRun migrations
confiture migrateStart FraiseQL server
fraiseql runVisit the editor
open http://localhost:3000Real-Time Features
Complete subscriptions reference.
NATS Integration
Event streaming and pub/sub patterns.
Performance Optimization
Batching and caching strategies.
Deployment
Kubernetes with sticky WebSocket sessions.