Skip to content

Real-Time Collaborative Editor

A complete example of a Google Docs-like collaborative editor using FraiseQL and NATS event streaming.

Repository: github.com/fraiseql/examples/realtime-collaboration

  • Real-Time Subscriptions: Live updates as other users edit
  • Conflict Resolution: Operational Transformation (OT) for concurrent edits
  • Presence: See who’s currently editing
  • Permissions: Share documents with specific permissions
  • History: Full edit history with timestamps
  • Cursor Tracking: See other users’ cursors in real-time
  • Comments: Thread-based discussions on document sections
CREATE TABLE tb_document (
pk_document SERIAL PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE,
title TEXT NOT NULL,
content TEXT NOT NULL DEFAULT '',
fk_owner INTEGER NOT NULL REFERENCES tb_user(pk_user),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE tb_document_share (
pk_share SERIAL PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
fk_document INTEGER NOT NULL REFERENCES tb_document(pk_document),
fk_user INTEGER NOT NULL REFERENCES tb_user(pk_user),
permission TEXT NOT NULL DEFAULT 'view', -- 'view', 'edit', 'admin'
shared_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(fk_document, fk_user)
);
CREATE TABLE tb_document_edit (
pk_edit SERIAL PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
fk_document INTEGER NOT NULL REFERENCES tb_document(pk_document),
fk_user INTEGER NOT NULL REFERENCES tb_user(pk_user),
operation JSONB NOT NULL, -- { type: "insert", position: 100, content: "text" }
timestamp TIMESTAMPTZ DEFAULT now()
);
-- Views with JSONB data column
CREATE VIEW v_document AS
SELECT
d.id,
d.identifier,
jsonb_build_object(
'id', d.id::text,
'identifier', d.identifier,
'title', d.title,
'content', d.content,
'owner_id', u.id::text,
'created_at', d.created_at,
'updated_at', d.updated_at
) AS data
FROM tb_document d
JOIN tb_user u ON u.pk_user = d.fk_owner;
CREATE VIEW v_document_edit AS
SELECT
e.id,
jsonb_build_object(
'id', e.id::text,
'document_id', d.id::text,
'user_id', u.id::text,
'operation', e.operation,
'timestamp', e.timestamp
) AS data,
e.fk_document
FROM tb_document_edit e
JOIN tb_document d ON d.pk_document = e.fk_document
JOIN tb_user u ON u.pk_user = e.fk_user;
import fraiseql
from fraiseql.scalars import ID
from datetime import datetime
@fraiseql.type
class Document:
id: ID
title: str
content: str
owner_id: ID
created_at: datetime
updated_at: datetime
@fraiseql.type
class DocumentShare:
id: ID
document_id: ID
user_id: ID
permission: str # "view", "edit", "admin"
shared_at: datetime
@fraiseql.type
class DocumentEdit:
id: ID
document_id: ID
user_id: ID
operation: dict # { type: "insert", position: 100, content: "text" }
timestamp: datetime
@fraiseql.type
class DocumentPresence:
user_id: ID
cursor_position: int
selection_start: int
selection_end: int
# Subscribe to document changes
subscription {
documentChanged(documentId: "doc123") {
id
content
updatedAt
}
}
# Subscribe to presence (who's editing)
subscription {
presenceUpdated(documentId: "doc123") {
userId
cursorPosition
user { name email }
}
}
# Subscribe to comments
subscription {
commentAdded(documentId: "doc123") {
id
content
author { name }
createdAt
}
}
# Apply an edit operation
mutation {
applyEdit(
documentId: "doc123"
operation: {
type: "insert"
position: 150
content: "new text"
}
) {
id
operation
timestamp
}
}
# Update cursor/selection position
mutation {
updatePresence(
documentId: "doc123"
cursorPosition: 250
selectionStart: 200
selectionEnd: 300
) {
userId
cursorPosition
}
}
# Add comment to document range
mutation {
addComment(
documentId: "doc123"
startPosition: 100
endPosition: 150
content: "This needs clarification"
) {
id
content
author { name }
}
}
from operational_transform import apply_operation, transform_operations
@fraiseql.mutation
async def apply_edit(info, document_id: str, operation: dict) -> DocumentEdit:
"""Apply edit with automatic conflict resolution."""
user_id = get_current_user(info)
# Get current document and pending operations
doc = await ctx.db.query_one(
"SELECT pk_document, content FROM tb_document WHERE id = $1",
[document_id]
)
pending_ops = await ctx.db.query(
"""
SELECT operation FROM tb_document_edit
WHERE fk_document = $1 AND timestamp > $2
ORDER BY timestamp ASC
""",
[doc['pk_document'], operation['timestamp']]
)
# Transform operation against pending operations
transformed_op = operation
for pending_op in pending_ops:
transformed_op = transform_operations(
transformed_op,
pending_op['operation']
)
# Apply transformed operation to document
new_content = apply_operation(doc['content'], transformed_op)
# Save operation and updated document
await ctx.db.execute(
"""
UPDATE tb_document
SET content = $1, updated_at = NOW()
WHERE pk_document = $2
""",
[new_content, doc['pk_document']]
)
edit = await ctx.db.query_one(
"""
INSERT INTO tb_document_edit (fk_document, fk_user, operation)
VALUES ($1, (SELECT pk_user FROM tb_user WHERE id = $2), $3::jsonb)
RETURNING id, operation, timestamp
""",
[doc['pk_document'], user_id, json.dumps(transformed_op)]
)
# Broadcast to other connected users
await fraiseql.nats.publish(
subject=f"documents:changed:{document_id}",
data={
'document_id': document_id,
'operation': transformed_op,
'user_id': user_id,
'content': new_content
}
)
return DocumentEdit(**edit)
@fraiseql.subscription
async def document_changed(info, document_id: str):
"""Subscribe to document changes."""
user_id = get_current_user(info)
# Check permission
share = await ctx.db.query_one(
"""
SELECT permission FROM tb_document_share
WHERE fk_document = (SELECT pk_document FROM tb_document WHERE id = $1)
AND fk_user = (SELECT pk_user FROM tb_user WHERE id = $2)
""",
[document_id, user_id]
)
if not share:
raise PermissionError("You don't have access to this document")
async for event in fraiseql.nats.subscribe(f'documents:changed:{document_id}'):
yield {
'id': event['document_id'],
'content': event['content'],
'updatedAt': datetime.utcnow()
}
@fraiseql.mutation
async def update_presence(
info,
document_id: str,
cursor_position: int,
selection_start: int = 0,
selection_end: int = 0
) -> DocumentPresence:
"""Update user's cursor/selection and broadcast."""
user_id = get_current_user(info)
# Store presence in Redis (ephemeral, expires after 1 hour)
await redis.setex(
f"presence:{document_id}:{user_id}",
3600,
json.dumps({
'cursor_position': cursor_position,
'selection_start': selection_start,
'selection_end': selection_end,
'updated_at': datetime.utcnow().isoformat()
})
)
# Broadcast presence update
await fraiseql.nats.publish(
subject=f"documents:presence:{document_id}",
data={
'user_id': user_id,
'cursor_position': cursor_position,
'selection_start': selection_start,
'selection_end': selection_end
}
)
return DocumentPresence(
user_id=user_id,
cursor_position=cursor_position,
selection_start=selection_start,
selection_end=selection_end
)
@fraiseql.subscription
async def presence_updated(info, document_id: str):
"""Subscribe to presence updates (cursors, selections)."""
async for event in fraiseql.nats.subscribe(f'documents:presence:{document_id}'):
yield {
'userId': event['user_id'],
'cursorPosition': event['cursor_position'],
'selectionStart': event['selection_start'],
'selectionEnd': event['selection_end']
}
@fraiseql.mutation
async def share_document(
info,
document_id: str,
user_id: str,
permission: str
) -> DocumentShare:
"""Share document with another user."""
current_user_id = get_current_user(info)
# Verify ownership
doc = await ctx.db.query_one(
"""
SELECT pk_document FROM tb_document
WHERE id = $1
AND fk_owner = (SELECT pk_user FROM tb_user WHERE id = $2)
""",
[document_id, current_user_id]
)
if not doc:
raise PermissionError("You can't share documents you don't own")
share = await ctx.db.query_one(
"""
INSERT INTO tb_document_share (fk_document, fk_user, permission)
VALUES (
$1,
(SELECT pk_user FROM tb_user WHERE id = $2),
$3
)
ON CONFLICT (fk_document, fk_user) DO UPDATE SET permission = EXCLUDED.permission
RETURNING id, permission, shared_at
""",
[doc['pk_document'], user_id, permission]
)
# Notify recipient
await fraiseql.nats.publish(
subject="share:received",
data={
'document_id': document_id,
'user_id': user_id,
'permission': permission
}
)
return DocumentShare(**share)
@fraiseql.query(sql_source="v_document_edit")
def document_history(document_id: str, first: int = 50) -> list[DocumentEdit]:
"""Get edit history for a document in chronological order."""
pass
  1. Clone the example

    Terminal window
    git clone https://github.com/fraiseql/examples/realtime-collaboration
    cd realtime-collaboration
  2. Set up environment

    Terminal window
    cp .env.example .env
    uv sync
  3. Start services

    Terminal window
    docker-compose up -d postgres redis nats
  4. Run migrations

    Terminal window
    confiture migrate
  5. Start FraiseQL server

    Terminal window
    fraiseql run
  6. Visit the editor

    Terminal window
    open http://localhost:3000
  1. Basic: Create/read/share documents
  2. Real-Time: Add subscriptions for live updates
  3. Collaboration: Implement OT for conflict resolution
  4. Presence: Track cursors and selections
  5. Advanced: Performance optimization with batching

Real-Time Features

Complete subscriptions reference.

NATS Integration

Event streaming and pub/sub patterns.

Performance Optimization

Batching and caching strategies.

Deployment

Kubernetes with sticky WebSocket sessions.