Arrow Dataplane
Arrow Dataplane — Columnar streaming for analytics
fraiseql-wire is a standalone Rust client library for streaming large JSON result sets from PostgreSQL with bounded memory. It connects directly to your database using the PostgreSQL wire protocol — a purpose-built alternative to general-purpose drivers when you need bulk reads without materializing the entire result set in memory.
| Scenario | Standard GraphQL | Wire |
|---|---|---|
| Interactive UI queries | Recommended | Overhead |
| < 10,000 rows | Recommended | Overkill |
| 10,000 – 100,000 rows | Acceptable | Recommended |
| > 100,000 rows | May OOM | Required |
| ETL pipelines | Awkward | Ideal |
| Bulk data exports | Memory issues | Ideal |
| Analytics workloads | — | Use Arrow instead |
cargo add fraiseql-wireRequires Rust 1.75+ and PostgreSQL 10+. No C dependencies — pure Rust via rustls (no OpenSSL required).
use fraiseql_wire::FraiseClient;
// Simplest: connection stringlet client = FraiseClient::connect("postgres://user:pass@localhost/mydb").await?;
// With TLSlet client = FraiseClient::connect("postgres://user:pass@host/mydb?sslmode=require").await?;use futures::StreamExt;
let mut stream = client .query::<serde_json::Value>("orders") // streams from v_orders view .execute() .await?;
while let Some(row) = stream.next().await { let json = row?; println!("{json}");}The generic type parameter affects only deserialization — it has no effect on the SQL sent to PostgreSQL.
#[derive(serde::Deserialize)]struct Order { id: String, total: f64, status: String,}
let mut stream = client .query::<Order>("orders") .execute() .await?;
while let Some(result) = stream.next().await { let order: Order = result?; println!("Order {}: ${}", order.id, order.total);}fraiseql-wire reads from views named v_{entity} that expose a single column named data of type json or jsonb. This is the same naming convention FraiseQL uses:
-- Required view structureCREATE VIEW v_orders ASSELECT jsonb_build_object( 'id', o.id, 'total', o.total, 'status', o.status) AS dataFROM tb_order o;tv_{entity} (time-series views) are intentionally not supported.
Wire supports hybrid filtering — SQL predicates reduce wire traffic, Rust predicates refine locally:
let stream = client .query::<serde_json::Value>("orders") // Evaluated in PostgreSQL — reduces rows returned .where_sql("data->>'status' = 'shipped'") // Evaluated in Rust — applied after PostgreSQL delivers rows .where_rust(|json| { json["total"].as_f64().unwrap_or(0.0) > 500.0 }) .execute() .await?;For complex predicates, use the WhereOperator enum instead of raw SQL strings:
use fraiseql_wire::operators::{WhereOperator, Field, Value};
let filter = WhereOperator::Gt( Field::new("data->>'total'"), Value::Float(500.0),);
let stream = client .query::<serde_json::Value>("orders") .where_operator(filter) .execute() .await?;25+ operators are available, including array operations, string matching (ILIKE, LIKE), null checks, pgvector distances (L2, cosine, inner product), full-text search, and IP network operations.
let stream = client .query::<serde_json::Value>("orders") .order_by("data->>'created_at' DESC") .limit(1000) .offset(500) .execute() .await?;With COLLATE for locale-aware string sorting:
.order_by("data->>'name' ASC COLLATE \"C\"")// Only request the fields you need — reduces wire traffic.select_projection( "jsonb_build_object('id', data->>'id', 'total', data->>'total')")fraiseql-wire maintains O(chunk_size) memory regardless of result size. A 1-million-row query uses the same memory as a 100-row query.
let stream = client .query::<serde_json::Value>("large_table") .chunk_size(512) // Items buffered at once (default: 256) .max_memory(500_000_000) // 500 MB hard limit — returns error if exceeded .adaptive_chunking(true) // Auto-optimize based on channel pressure (default: on) .adaptive_min_size(16) // Never shrink below 16 items .adaptive_max_size(1024) // Never grow above 1024 items .execute() .await?;| Rows | tokio-postgres | fraiseql-wire |
|---|---|---|
| 10,000 | 2.6 MB | 1.3 KB |
| 100,000 | 26 MB | 1.3 KB |
| 1,000,000 | 260 MB | 1.3 KB |
Memory stays constant at approximately chunk_size × avg_row_size.
For advanced flow control — for example, to throttle consumption or coordinate with a downstream system:
let mut stream = client.query::<T>("orders").execute().await?;
// Process some rowswhile let Some(item) = stream.next().await { process(item?); break;}
// Pause production — PostgreSQL stops sending rowsstream.pause().await?;
// ... do other work ...
// Resume — continues from exactly where it stoppedstream.resume().await?;
// Inspect current stream statelet stats = stream.stats();println!("Rows yielded: {}", stats.rows_yielded);postgres://[user[:password]@][host][:port][/database][?param=value...]Supported parameters:
| Parameter | Example | Description |
|---|---|---|
sslmode | require | disable, require, verify-ca, verify-full |
sslrootcert | /path/to/ca.pem | Custom CA certificate |
sslcert | /path/to/cert.pem | Client certificate (mTLS) |
sslkey | /path/to/key.pem | Client key (mTLS) |
application_name | my-service | Visible in pg_stat_activity |
statement_timeout | 30000ms | Per-query timeout |
tcp_keepalives_idle | 300s | Keep-alive interval |
Unix socket connection: postgres:///mydb
For more control than connection strings allow:
use fraiseql_wire::ConnectionConfig;use std::time::Duration;
let config = ConnectionConfig::builder("mydb", "user") .password("secret") .connect_timeout(Duration::from_secs(10)) .statement_timeout(Duration::from_secs(30)) .keepalive_idle(Duration::from_secs(300)) .application_name("my-service") .build();
let client = FraiseClient::connect_with_config("postgres://host/mydb", config).await?;use fraiseql_wire::TlsConfig;
// Custom CA (server certificate verification)let tls = TlsConfig::builder() .verify_hostname(true) .ca_cert_path("/path/to/ca.pem")? .build()?;
// Mutual TLSlet tls = TlsConfig::builder() .ca_cert_path("/path/to/ca.pem")? .client_cert_path("/path/to/cert.pem")? .client_key_path("/path/to/key.pem")? .build()?;
let client = FraiseClient::connect_tls("postgres://host/db", tls).await?;Authentication is SCRAM-SHA-256 with channel binding (prevents MITM attacks). Implemented in pure Rust via rustls — no OpenSSL dependency.
fraiseql-wire doesn’t include a built-in pool. Integrate with deadpool or bb8:
[dependencies]deadpool = "0.12"fraiseql-wire = "0.1"// One FraiseClient per connection; pool manages the connections// See CONNECTION_POOLING.md in the fraiseql-wire repository for full examplesuse fraiseql_wire::Error;
match client.query::<T>("orders").execute().await { Err(Error::Connection(msg)) => eprintln!("Connection failed: {msg}"), Err(Error::Authentication(msg)) => eprintln!("Auth failed: {msg}"), Err(Error::Sql(msg)) => eprintln!("SQL error: {msg}"), Err(Error::MemoryLimitExceeded { limit, estimated_memory }) => { eprintln!("Memory limit {limit} exceeded (estimated {estimated_memory})") } Err(Error::Cancelled) => { /* stream was dropped */ } Ok(stream) => { /* use stream */ }}fraiseql-wire emits metrics via the metrics crate (framework-agnostic). Register a recorder (Prometheus, StatsD, etc.) before connecting:
| Metric | Type | Description |
|---|---|---|
fraiseql_wire_rows_yielded | Counter | Rows delivered to caller per entity |
fraiseql_wire_rows_filtered | Counter | Rows dropped by Rust predicate |
fraiseql_wire_query_duration_ms | Histogram | Full query duration |
fraiseql_wire_query_startup_duration_ms | Histogram | Time to first row |
fraiseql_wire_chunk_processing_duration_ms | Histogram | Per-chunk latency |
fraiseql_wire_estimated_memory_usage | Gauge | Current memory estimate |
fraiseql_wire_current_chunk_size | Gauge | Active chunk size (adaptive) |
fraiseql_wire_stream_buffered_items | Gauge | Items waiting in buffer |
fraiseql_wire_query_errors_total | Counter | Errors by entity and category |
| Feature | Wire | Arrow |
|---|---|---|
| Format | JSON (row-oriented) | Columnar binary |
| Memory | Bounded (O(chunk_size)) | Bounded |
| Speed | 100K–500K rows/sec | Higher throughput |
| Compatibility | Any Rust async code | Arrow Flight SDK |
| Use case | ETL, bulk export, services | Analytics, ML, data science |
| Client languages | Rust | Python, TypeScript, Go, Rust, R, Julia |
Error::Connection("Connection refused")SELECT privileges on the viewError::Authentication("SCRAM authentication failed")pg_hba.conf allows SCRAM-SHA-256 for this user and hostdata column not foundError::InvalidSchema("Expected column 'data', got: ...")The view must expose exactly one column named data of type json or jsonb. See View Convention above.
Error::MemoryLimitExceeded { limit, estimated_memory }Increase max_memory() or reduce chunk_size() to lower per-chunk footprint:
.chunk_size(64).max_memory(1_000_000_000) // 1 GBWHERE columns in the underlying tableEXPLAIN ANALYZE on the view query.select_projection(...) to reduce row payload sizestatement_timeout — set it higher than your longest expected export:
postgres://host/db?statement_timeout=600000msArrow Dataplane
Arrow Dataplane — Columnar streaming for analytics
Pagination
Pagination — Cursor and offset pagination via GraphQL
Performance
Performance — Query and schema optimization