stemedb/docs/research/distributed-write-path.md
jordan 55349845d0 refactor: Split all files to enforce 500-line max
Break monolith source files into focused modules:
- stemedb-core/types.rs → types/ directory (assertion, source, gold_standard, etc.)
- stemedb-storage: audit_store, quota_store, trust_rank_store, vector_index, vote_store → module directories
- stemedb-ingest/worker.rs → worker/ with separate test modules
- stemedb-query: engine, materializer, query → module directories
- stemedb-lens: epoch_aware, skeptic → module directories
- stemedb-sim/lib.rs → agent, arenas/, helpers, runner, strategy, types
- stemedb-api/tests: integration_tests → http_basic, http_validation, http_epoch, http_pipeline
- stemedb-api/tests: e2e_flow_test → e2e_full_pipeline, e2e_lens_resolution
- stemedb-query/tests: e2e_pipeline → e2e_pipeline + e2e_decay

Also adds new features: gold standard verification, escalation handlers,
admin endpoints, concept hierarchy spec, arena roadmap, and Go SDK.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 01:13:45 -07:00

1246 lines
41 KiB
Markdown
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Distributed Write Path Research: CockroachDB/Spanner-Style for Episteme
**Date:** 2026-02-01
**Status:** Research Complete
**Target:** Design simplest correct distributed write path for append-only knowledge graph
---
## Executive Summary
After comprehensive research into Google Spanner, CockroachDB, Hybrid Logical Clocks, and distributed append-only systems, **we can implement a significantly simpler architecture than traditional distributed databases** because:
1. **Assertions are immutable** - no read-write conflicts
2. **Content-addressed** - automatic deduplication
3. **Read-time conflict resolution** - no distributed transactions needed
4. **Natural partition keys** - subject/predicate provide excellent sharding boundaries
**Recommendation:** Use **Hybrid Logical Clocks (HLC) + Single-Writer-Per-Partition + Simple Raft Replication** instead of full Spanner-style distributed transactions.
---
## 1. Google Spanner's Architecture
### Core Components
**TrueTime API**
- GPS + atomic clock synchronization providing <1ms uncertainty in 99th percentile
- Returns time interval `[earliest, latest]` guaranteed to contain actual time
- Enables external consistency without distributed coordination
**External Consistency Guarantee**
- If transaction T1 completes before T2 starts committing, then `timestamp(T2) > timestamp(T1)`
- Achieved via **commit-wait**: system waits until `TT.now().earliest > commit_timestamp`
- Guarantees serializable transactions visible globally
**Paxos Groups**
- Data partitioned into ranges, each range replicated via Paxos
- Writes flow: Client Gateway Paxos Leader Replicas
- Read-only transactions skip locks (use snapshot isolation)
**Key Insight for Episteme:** Spanner's complexity stems from mutable records requiring distributed transactions. We don't have mutable records.
**Sources:**
- [Spanner TrueTime Documentation](https://docs.cloud.google.com/spanner/docs/true-time-external-consistency)
- [Strict Serializability in Spanner](https://cloud.google.com/blog/products/databases/strict-serializability-and-external-consistency-in-spanner)
- [Life of Spanner Reads & Writes](https://docs.cloud.google.com/spanner/docs/whitepapers/life-of-reads-and-writes)
---
## 2. CockroachDB's Distributed Write Path
### Architecture Layers
```
Client Request
Transaction Coordinator (TxnCoordSender)
Distribution Layer (DistSender) - routes to range leaseholder
Replication Layer (Raft) - consensus among replicas
Storage Layer (RocksDB) - persistent KV store
```
### Key Optimizations
**Parallel Commits**
- Reduces commit latency by 50% (from 2 consensus rounds to 1)
- Introduces `STAGING` transaction status
- Writes final batch and transaction record in parallel
- Improved p50 latency by 47%, throughput by 72% in benchmarks
**Transaction Pipelining**
- Combines with parallel commits for near-theoretical minimum latency
- Total latency = sum(read latencies) + 1 consensus round
- Trade-off: Extra WAL write for STAGING status
**MultiRaft**
- Each range has own consensus group
- Nodes participate in thousands of concurrent Raft groups
- Leaseholder serves reads, coordinates writes
**DistSender Intelligence**
- Batches requests to same range
- Routes to current leaseholder (cached)
- Handles range splits/merges transparently
**Key Insight for Episteme:** CockroachDB's complexity handles multi-statement ACID transactions across ranges. Single assertion writes are much simpler.
**Sources:**
- [Parallel Commits Protocol](https://www.cockroachlabs.com/blog/parallel-commits/)
- [Transaction Pipelining](https://www.cockroachlabs.com/blog/transaction-pipelining/)
- [Replication Layer Documentation](https://www.cockroachlabs.com/docs/stable/architecture/replication-layer)
- [Life of a Distributed Transaction](https://www.cockroachlabs.com/docs/stable/architecture/life-of-a-distributed-transaction)
---
## 3. Hybrid Logical Clocks (HLC)
### Why HLC Instead of TrueTime
TrueTime requires GPS/atomic clocks (expensive infrastructure). HLC achieves similar guarantees using NTP + logical counters.
### HLC Structure
```rust
struct HLC {
physical: u64, // Wall clock time (NTP-synchronized)
logical: u64, // Counter for same physical timestamp
node_id: u64, // Tie-breaker for total ordering
}
```
### Properties
1. **Causality Preservation:** If event A happens-before event B, then `HLC(A) < HLC(B)`
2. **Close to Physical Time:** Physical component stays within NTP bounds (~10-100ms typically)
3. **Monotonic:** Timestamps never decrease, even with clock skew
4. **No Clock Updates:** HLC reads but never modifies system clock
### How It Works
```rust
// On local event
fn generate_timestamp() -> HLC {
let now = system_time();
if now > last_hlc.physical {
HLC { physical: now, logical: 0, node_id }
} else {
HLC { physical: last_hlc.physical, logical: last_hlc.logical + 1, node_id }
}
}
// On receiving message with remote timestamp
fn update_on_receive(remote_hlc: HLC) -> HLC {
let now = system_time();
let physical = max(now, remote_hlc.physical, last_hlc.physical);
let logical = if physical == last_hlc.physical {
last_hlc.logical + 1
} else if physical == remote_hlc.physical {
remote_hlc.logical + 1
} else {
0
};
HLC { physical, logical, node_id }
}
```
### Clock Skew Handling
- CockroachDB default max offset: 500ms
- Nodes self-evict if skew exceeds threshold
- NTP typically achieves <10ms on LAN, <100ms on WAN
### Rust Implementations
1. **uhlc-rs** (atolab/uhlc-rs)
- Production-ready, used in distributed systems
- Unique identifier per clock
- Monotonic timestamps close to physical time
2. **hlc-rs** (tbg/hlc-rs)
- Simpler implementation
- Good starting point
3. **hlc-gen**
- Lock-free implementation
- `#[no_std]` compatible
- High throughput
**Recommendation:** Start with `uhlc-rs` - battle-tested and well-documented.
**Sources:**
- [Hybrid Logical Clocks Paper](https://cse.buffalo.edu/~demirbas/publications/hlc.pdf)
- [uhlc-rs Implementation](https://github.com/atolab/uhlc-rs)
- [HLC in CockroachDB](https://www.yugabyte.com/blog/evolving-clock-sync-for-distributed-databases/)
- [Clock Skew Management](https://sergeiturukin.com/2017/06/26/hybrid-logical-clocks.html)
---
## 4. The Append-Only Advantage
### What We Skip
**No Distributed Transactions**
- Single assertion write = single consensus operation
- No two-phase commit
- No transaction coordinator
- No deadlock detection
**No Read-Write Conflicts**
- Assertions never mutate
- No need for pessimistic locking
- No MVCC tombstones
- Reads never block writes
**No Transaction Isolation Complexity**
- No serializable snapshot isolation
- No read-write conflict tracking
- No transaction retry logic
**Automatic Deduplication**
- Content addressing: identical assertions have same ID
- Write is idempotent: `store(hash(content), content)`
- Replaying writes is safe
### What We Keep
**Ordering Guarantees**
- Supersession chains require causal ordering
- HLC provides total ordering across cluster
- Example: Assertion A2 supersedes A1, must ensure `timestamp(A2) > timestamp(A1)`
**Durability**
- Still need WAL + fsync
- Still need replication for fault tolerance
- Still need crash recovery
**Consistency**
- Raft consensus ensures replicas agree
- Leader election on failure
- No split-brain scenarios
### Comparison: Full Spanner vs Episteme
| Feature | Spanner/CockroachDB | Episteme (Append-Only) |
|---------|---------------------|------------------------|
| Write path | TxnCoordSender DistSender Raft | Direct Raft |
| Transaction coordinator | Required | Not needed |
| Distributed locks | Yes (pessimistic) | No locks needed |
| Two-phase commit | Yes | No |
| Intent resolution | Complex | Not applicable |
| Read-write conflicts | Must detect and resolve | Cannot occur |
| Write amplification | High (intents + resolution) | Low (direct append) |
| Latency | 2-3 network hops | 1 network hop |
**Key Insight:** Immutability eliminates entire classes of distributed systems problems.
**Sources:**
- [Append-Only Logs](https://medium.com/@komalshehzadi/append-only-logs-the-immutable-diary-of-data-58c36a871c7c)
- [Immutability Changes Everything (ACM)](https://queue.acm.org/detail.cfm?id=2884038)
- [The Rise of Immutable Data Stores](https://www.odbms.org/2015/10/the-rise-of-immutable-data-stores/)
- [Data Replication in Distributed Systems](https://candost.blog/books/data-replication-in-distributed-systems/)
---
## 5. Sharding Strategy
### Options Analysis
#### 5.1 Hash-Based Sharding (by Assertion ID)
```
Shard = BLAKE3(assertion_content) % num_shards
```
**Pros:**
- Uniform distribution
- No hotspots
- Simple implementation
**Cons:**
- Related assertions (same subject) scattered across shards
- Query "all claims about subject X" requires scatter-gather
- No data locality
**Verdict:** Poor fit for knowledge graph queries
---
#### 5.2 Range-Based Sharding (by Subject)
```
Shard_A: subjects [aaa, bbb)
Shard_B: subjects [bbb, ccc)
Shard_C: subjects [ccc, zzz]
```
**Pros:**
- All assertions about subject X on same shard
- Range queries efficient
- Good for "tell me everything about X"
**Cons:**
- Hotspot subjects (e.g., "USA", "COVID-19")
- Manual range assignment
- Complex rebalancing
**Verdict:** Good for queries, problematic for hot subjects
---
#### 5.3 Hash-Based by Subject (Recommended)
```
Shard = hash(subject_uri) % num_shards
```
**Pros:**
- All assertions about subject X on same shard
- Automatic load distribution
- No manual range management
- Simple shard assignment
**Cons:**
- Predicate-based queries still scatter-gather
- Rebalancing requires re-hashing (use consistent hashing)
**Verdict:** Best balance for Episteme
---
#### 5.4 Composite: Subject + Predicate Hash
```
Shard = hash(subject_uri || predicate_uri) % num_shards
```
**Pros:**
- Even more specific data locality
- "Subject X with predicate P" queries hit single shard
**Cons:**
- "All claims about subject X" now scatters
- More complex query routing
**Verdict:** Only if predicate-specific queries dominate workload
---
### Recommended Strategy
**Primary: Hash-based sharding by Subject**
```rust
fn assign_shard(assertion: &Assertion, num_shards: u32) -> u32 {
let hash = blake3::hash(assertion.subject.as_bytes());
u32::from_le_bytes(hash.as_bytes()[0..4].try_into().unwrap()) % num_shards
}
```
**With Consistent Hashing for Rebalancing:**
Use jump hash or consistent hash ring to minimize data movement when adding/removing shards.
```rust
// Jump hash provides minimal key redistribution
fn jump_hash(key: u64, num_buckets: u32) -> u32 {
let mut b = -1i64;
let mut j = 0i64;
let mut k = key;
while j < num_buckets as i64 {
b = j;
k = k.wrapping_mul(2862933555777941757).wrapping_add(1);
j = ((b.wrapping_add(1)) as f64 * (f64::powi(2.0, 31) / ((k >> 33).wrapping_add(1)) as f64)) as i64;
}
b as u32
}
```
**Sources:**
- [Database Sharding Strategies](https://engineeringatscale.substack.com/p/system-design-concepts-dive-deep)
- [Sharding Types (PlanetScale)](https://planetscale.com/blog/types-of-sharding)
- [Consistent Hashing Guide](https://www.toptal.com/big-data/consistent-hashing)
- [YugabyteDB Sharding Analysis](https://www.yugabyte.com/blog/four-data-sharding-strategies-we-analyzed-in-building-a-distributed-sql-database/)
- [RDF Triple Store Partitioning](https://www.iaria.org/conferences2018/filesDBKDA18/IztokSavnik_Tutorial_3store-arch.pdf)
---
## 6. Write Amplification and Performance
### Raft Write Overhead
**Standard Raft Write Path:**
1. Client sends write to leader
2. Leader appends to local WAL (fsync)
3. Leader sends AppendEntries RPC to followers
4. Followers append to local WAL (fsync)
5. Followers respond to leader
6. Leader commits when quorum reached
7. Leader responds to client
**Cost Analysis:**
- **Network:** 1 round-trip (leader followers)
- **Disk:** 1 fsync at leader + (replication_factor - 1) fsyncs at followers
- **Write Amplification:** `replication_factor` (typically 3x-5x)
### Batching Optimizations
**Batch AppendEntries:**
Instead of sending RPC per write, batch N writes into single RPC.
```rust
// Naive: 1000 writes = 1000 RPCs
for write in writes {
leader.append_entries(vec![write]).await?;
}
// Optimized: 1000 writes = 1 RPC
leader.append_entries(writes).await?;
```
**Performance Impact (from YugabyteDB):**
- Throughput: 2x improvement with batching
- Latency: 30% reduction for p99
**Write Buffering:**
Buffer writes in memory, flush when:
- Buffer reaches size threshold (e.g., 1MB)
- Time threshold reached (e.g., 10ms)
- Explicit flush request
**Trade-off:** Latency vs throughput
- Small batches (10ms): Lower latency, moderate throughput
- Large batches (100ms): Higher latency, maximum throughput
### Pipeline Optimization
**CockroachDB's Parallel Commits Approach:**
Standard 2-phase commit:
```
Write intents → Commit record → Resolve intents
(1 RTT) (1 RTT) (async)
Total: 2 RTT
```
Parallel commits:
```
Write intents + Commit record (parallel)
(1 RTT)
Total: 1 RTT
```
**Episteme Equivalent:**
We don't have intents, but we can pipeline:
```rust
// Sequential: 2 consensus rounds
let wal_entry = wal.append(assertion).await?; // Round 1
let index_update = index.update(assertion).await?; // Round 2
// Pipelined: 1 consensus round
let (wal_entry, index_update) = tokio::join!(
wal.append(assertion),
index.update(assertion),
);
```
### Expected Performance Numbers
**Local Write (Single Node):**
- Latency: 1-5ms (dominated by fsync)
- Throughput: 10K-50K writes/sec (NVMe SSD)
**Distributed Write (3 Replicas, Same DC):**
- Latency: 5-15ms (1 RTT + fsync)
- Throughput: 5K-20K writes/sec per shard
**Distributed Write (3 Replicas, Multi-Region):**
- Latency: 50-200ms (depends on geography)
- Throughput: 1K-5K writes/sec per shard
**Batched Writes (100 assertions/batch):**
- Throughput: 100K-500K assertions/sec per shard
**With 100 Shards:**
- Aggregate throughput: 10M-50M assertions/sec
**Sources:**
- [Raft Consensus Algorithm](https://raft.github.io/)
- [Fast Raft Optimizations](https://arxiv.org/html/2506.17793v1)
- [YugabyteDB Write Buffering](https://dev.to/yugabyte/write-buffering-to-reduce-raft-consensus-latency-in-yugabytedb-2dg6)
- [Improved Raft with Batch Processing](https://link.springer.com/chapter/10.1007/978-981-19-2456-9_44)
---
## 7. Concrete Architecture Proposal
### The Simplest Correct Design
```
┌─────────────────────────────────────────────────────────┐
│ Agent (Writer) │
│ │
│ 1. Generate Assertion │
│ 2. Sign with Ed25519 │
│ 3. Compute Content Hash (BLAKE3) │
└────────────────┬────────────────────────────────────────┘
│ HTTP POST /assertions
┌─────────────────────────────────────────────────────────┐
│ Gateway Node (Any) │
│ │
│ 1. Validate signature │
│ 2. Compute shard: hash(subject) % num_shards │
│ 3. Route to shard leader │
└────────────────┬────────────────────────────────────────┘
│ gRPC → Shard Leader
┌─────────────────────────────────────────────────────────┐
│ Shard N (3-5 Replicas) │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Leader Node │ │
│ │ │ │
│ │ 1. Generate HLC timestamp │ │
│ │ 2. Append to WAL (fsync) │ │
│ │ 3. Propose to Raft group │◄──────────┤── Raft RPCs
│ │ 4. Wait for quorum │ │
│ │ 5. Apply to KV store │ │
│ │ 6. Update indexes (async) │ │
│ │ 7. Respond to gateway │ │
│ └──────────────────────────────────────────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Follower 1 │ │ Follower 2 │ │ Follower 3 │ │
│ │ │ │ │ │ │ │
│ │ - Replicate │ │ - Replicate │ │ - Replicate │ │
│ │ - Apply │ │ - Apply │ │ - Apply │ │
│ │ - Ready for │ │ - Ready for │ │ - Ready for │ │
│ │ failover │ │ failover │ │ failover │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
```
### Component Breakdown
#### 7.1 Gateway Layer (Stateless)
**Responsibilities:**
- Accept HTTP/gRPC requests
- Authenticate agents (verify Ed25519 signatures)
- Route requests to correct shard
- Cache shard topology
- Return responses to clients
**Implementation:**
```rust
use axum::{Router, routing::post};
use blake3::Hash;
struct Gateway {
shard_map: Arc<RwLock<ShardMap>>,
http_client: reqwest::Client,
}
async fn handle_assertion(
State(gateway): State<Gateway>,
Json(assertion): Json<Assertion>,
) -> Result<StatusCode, ApiError> {
// 1. Validate signature
assertion.verify_signature()?;
// 2. Compute shard
let shard_id = compute_shard(&assertion.subject, gateway.shard_map.read().await.num_shards);
// 3. Find leader
let leader_addr = gateway.shard_map.read().await.get_leader(shard_id)?;
// 4. Forward to leader
gateway.http_client
.post(format!("{}/internal/append", leader_addr))
.json(&assertion)
.send()
.await?;
Ok(StatusCode::CREATED)
}
fn compute_shard(subject: &str, num_shards: u32) -> u32 {
let hash = blake3::hash(subject.as_bytes());
u32::from_le_bytes(hash.as_bytes()[0..4].try_into().unwrap()) % num_shards
}
```
**Scaling:** Run many gateway instances behind load balancer.
---
#### 7.2 Shard Leader (Stateful)
**Responsibilities:**
- Accept writes for this shard's key range
- Generate HLC timestamps
- Append to WAL
- Replicate via Raft
- Apply committed entries to storage
- Update indexes
- Handle leader election
**Implementation:**
```rust
use uhlc::HLC;
use tikv_raft::{RawNode, Config};
struct ShardLeader {
shard_id: u32,
hlc: Arc<Mutex<HLC>>,
wal: WriteAheadLog,
kv_store: KVStore,
raft_node: RawNode<MemStorage>,
index_store: IndexStore,
}
impl ShardLeader {
async fn append_assertion(&self, assertion: Assertion) -> Result<AssertionId, StorageError> {
// 1. Generate HLC timestamp
let timestamp = self.hlc.lock().await.new_timestamp();
// 2. Create versioned assertion
let versioned = VersionedAssertion {
assertion,
timestamp,
shard_id: self.shard_id,
};
// 3. Serialize
let bytes = serialize(&versioned)?;
// 4. Propose to Raft (this does WAL append + replication)
let propose_id = self.raft_node.propose(vec![], bytes)?;
// 5. Wait for commit (via channel or polling)
self.wait_for_commit(propose_id).await?;
// 6. Apply to KV store (idempotent)
let assertion_id = AssertionId::from_hash(blake3::hash(&bytes));
self.kv_store.put(assertion_id.as_bytes(), &bytes)?;
// 7. Update indexes (async, best-effort)
tokio::spawn({
let index_store = self.index_store.clone();
let versioned = versioned.clone();
async move {
let _ = index_store.index_assertion(&versioned).await;
}
});
Ok(assertion_id)
}
}
```
---
#### 7.3 Raft Replication (tikv/raft-rs)
**Configuration:**
```rust
use tikv_raft::{Config, RawNode, storage::MemStorage};
fn create_raft_node(node_id: u64, peers: Vec<u64>) -> RawNode<MemStorage> {
let config = Config {
id: node_id,
election_tick: 10,
heartbeat_tick: 3,
max_size_per_msg: 1024 * 1024, // 1MB
max_inflight_msgs: 256,
..Default::default()
};
let storage = MemStorage::new();
RawNode::new(&config, storage, peers).unwrap()
}
```
**Raft Message Handling:**
```rust
async fn run_raft_loop(raft_node: Arc<Mutex<RawNode<MemStorage>>>) {
let mut ticker = tokio::time::interval(Duration::from_millis(100));
loop {
ticker.tick().await;
let mut node = raft_node.lock().await;
node.tick();
// Process ready entries
if node.has_ready() {
let ready = node.ready();
// 1. Persist entries to WAL
if !ready.entries().is_empty() {
persist_entries(ready.entries()).await;
}
// 2. Send messages to peers
for msg in ready.messages {
send_to_peer(msg).await;
}
// 3. Apply committed entries
for entry in ready.committed_entries {
apply_entry(entry).await;
}
// 4. Advance Raft state
node.advance(ready);
}
}
}
```
---
#### 7.4 Storage Layer
**Write-Ahead Log:**
```rust
struct WriteAheadLog {
file: File,
current_offset: u64,
}
impl WriteAheadLog {
async fn append(&mut self, entry: &RaftEntry) -> Result<u64, io::Error> {
let bytes = serialize(entry)?;
let checksum = crc32c::crc32c(&bytes);
// Write: [len: u32][checksum: u32][data: bytes]
let record = [
&(bytes.len() as u32).to_le_bytes()[..],
&checksum.to_le_bytes()[..],
&bytes[..],
].concat();
self.file.write_all(&record).await?;
self.file.sync_all().await?; // fsync
let offset = self.current_offset;
self.current_offset += record.len() as u64;
Ok(offset)
}
}
```
**KV Store (RocksDB):**
```rust
use rocksdb::{DB, Options, WriteBatch};
struct KVStore {
db: Arc<DB>,
}
impl KVStore {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), rocksdb::Error> {
self.db.put(key, value)
}
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, rocksdb::Error> {
self.db.get(key)
}
fn batch_put(&self, entries: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), rocksdb::Error> {
let mut batch = WriteBatch::default();
for (key, value) in entries {
batch.put(key, value);
}
self.db.write(batch)
}
}
```
---
#### 7.5 Index Store (Async Updates)
**Subject Index:**
```rust
struct SubjectIndex {
db: Arc<DB>,
}
impl SubjectIndex {
async fn index_assertion(&self, assertion: &VersionedAssertion) -> Result<(), StorageError> {
let subject_key = format!("subject:{}:{}", assertion.assertion.subject, assertion.timestamp);
self.db.put(subject_key.as_bytes(), assertion.assertion.id.as_bytes())?;
Ok(())
}
async fn query_subject(&self, subject: &str, limit: usize) -> Result<Vec<AssertionId>, StorageError> {
let prefix = format!("subject:{}:", subject);
let mut results = Vec::new();
let iter = self.db.prefix_iterator(prefix.as_bytes());
for (_, value) in iter.take(limit) {
results.push(AssertionId::from_bytes(&value)?);
}
Ok(results)
}
}
```
---
### 7.6 Failure Handling
**Leader Failure:**
1. Followers detect missing heartbeats (election_timeout)
2. Follower transitions to candidate, increments term
3. Candidate requests votes from peers
4. Majority votes becomes new leader
5. New leader sends heartbeats
6. Gateway detects new leader (via Raft status API)
**Follower Failure:**
1. Leader detects missing ACKs
2. Leader continues with remaining quorum
3. When follower recovers, catches up via log replication
**Network Partition:**
1. Minority partition cannot elect leader (no quorum)
2. Minority partition rejects writes
3. Majority partition continues serving writes
4. When partition heals, minority replays majority's log
**Consistency Guarantee:** Raft ensures linearizability (strongest consistency model).
---
### 7.7 What We Skip
**No Transaction Coordinator:**
- Single assertion = single Raft proposal
- No multi-shard transactions
**No Intent Resolution:**
- CockroachDB writes "intents" then resolves them
- We write final values directly
**No Read-Write Conflict Detection:**
- Assertions are immutable
- No conflict possible
**No Distributed Locks:**
- No lock table
- No deadlock detection
**No MVCC Garbage Collection:**
- Assertions never deleted
- No tombstones to clean up
---
## 8. Rust Crates to Use
### Core Dependencies
```toml
[dependencies]
# Consensus
raft = "0.7" # TiKV's Raft implementation
raft-proto = "0.7" # Protobuf definitions
# Hybrid Logical Clock
uhlc = "0.5" # Battle-tested HLC
# Storage
rocksdb = "0.21" # Embedded KV store
sled = "0.34" # Alternative: pure-Rust KV store
# Serialization
rkyv = "0.7" # Zero-copy serialization (already using)
blake3 = "1.5" # Content addressing (already using)
# Async runtime
tokio = { version = "1.35", features = ["full"] }
tokio-stream = "0.1"
# Networking
tonic = "0.10" # gRPC (Raft messages)
axum = "0.7" # HTTP gateway (already using)
tower = "0.4" # Middleware
# Error handling
anyhow = "1.0"
thiserror = "1.0"
# Observability
tracing = "0.1" # Already using
tracing-subscriber = "0.3"
metrics = "0.21"
```
### Optional (Future)
```toml
# If we need service discovery
etcd-client = "0.12"
# If we want built-in consensus
tikv-client = "0.2" # Use TiKV as storage backend
# If we need distributed tracing
opentelemetry = "0.21"
opentelemetry-jaeger = "0.20"
```
---
## 9. Implementation Phases
### Phase 1: Single-Node with HLC (1-2 weeks)
**Goal:** Prove HLC integration + baseline performance
```rust
// Extend current ingestion with HLC
struct IngestionPipeline {
hlc: Arc<Mutex<HLC>>,
wal: WriteAheadLog,
kv_store: KVStore,
}
impl IngestionPipeline {
async fn ingest(&self, assertion: Assertion) -> Result<AssertionId, Error> {
let timestamp = self.hlc.lock().await.new_timestamp();
let versioned = VersionedAssertion { assertion, timestamp };
// Existing WAL append logic
self.wal.append(&versioned).await?;
// Existing KV store logic
let id = AssertionId::from_content(&versioned);
self.kv_store.put(id.as_bytes(), &serialize(&versioned)?)?;
Ok(id)
}
}
```
**Tests:**
- HLC monotonicity under concurrent writes
- HLC causality preservation (ingest A, ingest B superseding A, verify timestamps)
- Performance baseline (writes/sec, p99 latency)
---
### Phase 2: Single-Shard with Raft Replication (3-4 weeks)
**Goal:** Fault-tolerant single shard (3 replicas)
**Architecture:**
```
Writer → Leader (with Raft) → Followers (2x)
```
**Implementation:**
1. Wrap `IngestionPipeline` with Raft layer
2. Leader proposes writes to Raft group
3. Followers replicate and apply
4. Leader responds when committed
**Tests:**
- Leader failure: kill leader, verify follower elected, writes continue
- Follower failure: kill follower, verify writes continue with 2/3 quorum
- Recovery: restart failed node, verify catches up
- Split-brain: network partition, verify minority rejects writes
---
### Phase 3: Multi-Shard with Gateway (4-6 weeks)
**Goal:** Horizontal scalability
**Architecture:**
```
Writers → Gateway Layer (stateless, N instances)
┌─────────┼─────────┐
Shard 0 Shard 1 Shard 2
(3 replicas each)
```
**Implementation:**
1. Gateway routes by `hash(subject) % num_shards`
2. Shard topology stored in etcd or Raft metadata
3. Gateway caches topology, refreshes on errors
4. Dynamic shard addition (consistent hashing)
**Tests:**
- Load balancing: verify writes distributed evenly
- Shard failover: kill entire shard, verify gateway routes to backup
- Rebalancing: add shard, verify rehashing redistributes data
- Hotspot handling: send 10K writes to same subject, verify throughput
---
### Phase 4: Production Hardening (ongoing)
**Observability:**
```rust
// Metrics
metrics::counter!("assertions_ingested_total", "shard" => shard_id.to_string());
metrics::histogram!("raft_commit_duration_ms", commit_duration.as_millis() as f64);
metrics::gauge!("raft_leader_id", leader_id as f64);
// Tracing
#[instrument(skip(self), fields(shard_id = %self.shard_id, assertion_id = %assertion.id))]
async fn append_assertion(&self, assertion: Assertion) -> Result<()> {
info!("Appending assertion");
// ...
}
```
**Chaos Testing:**
- Random node kills
- Network latency injection
- Disk failures
- Clock skew simulation
**Performance Tuning:**
- Batch size optimization (find sweet spot for latency vs throughput)
- RocksDB compaction tuning
- Raft message batching
- Index update batching
---
## 10. Key Design Decisions Summary
| Decision | Choice | Rationale |
|----------|--------|-----------|
| **Clock** | Hybrid Logical Clock (HLC) | Causality + physical time without GPS clocks |
| **Consensus** | Raft (tikv/raft-rs) | Simple, proven, excellent Rust implementation |
| **Sharding** | Hash by subject | Data locality for common queries |
| **Replication** | 3 replicas per shard | Tolerate 1 failure (quorum = 2/3) |
| **Storage** | RocksDB (via rust-rocksdb) | LSM tree, proven at scale, good Rust bindings |
| **Serialization** | rkyv (already using) | Zero-copy, fastest option |
| **Transactions** | None (single-assertion writes) | Append-only eliminates need |
| **Gateway** | Stateless HTTP/gRPC | Easy to scale horizontally |
---
## 11. Performance Expectations
### Single Shard (3 Replicas, Same Datacenter)
| Metric | No Batching | With Batching (100/batch) |
|--------|-------------|---------------------------|
| Write Latency (p50) | 5-10ms | 5-10ms |
| Write Latency (p99) | 15-30ms | 15-30ms |
| Throughput | 5K-20K/sec | 100K-500K/sec |
| Write Amplification | 3x (replication) | 3x |
### 100 Shards
| Metric | Value |
|--------|-------|
| Aggregate Throughput | 10M-50M assertions/sec |
| Storage Efficiency | ~70% (accounting for WAL, indexes) |
| Fault Tolerance | Lose any 1 replica per shard |
### Resource Requirements (per node)
| Resource | Requirement |
|----------|-------------|
| CPU | 8-16 cores |
| RAM | 32-64 GB |
| Disk | 1-4 TB NVMe SSD |
| Network | 10 Gbps |
---
## 12. Comparison: Traditional vs Episteme
### CockroachDB Transaction Flow
```
1. Client: BEGIN TRANSACTION
2. TxnCoordSender: Create transaction record
3. DistSender: Route to ranges, write intents
4. Raft: Replicate intents (2 RTT)
5. TxnCoordSender: Write commit record
6. Raft: Replicate commit (1 RTT)
7. TxnCoordSender: Resolve intents (async)
8. Client: Transaction complete
Total: ~3-4 RTT, complex state machine
```
### Episteme Assertion Flow
```
1. Client: POST /assertions
2. Gateway: Route to shard leader
3. Leader: Propose to Raft
4. Raft: Replicate and commit (1 RTT)
5. Leader: Apply to storage
6. Gateway: Return assertion ID
Total: ~1-2 RTT, simple state machine
```
**Latency Reduction:** 50-75% vs traditional distributed database
**Code Complexity:** ~80% less than CockroachDB transaction coordinator
---
## 13. Open Questions & Future Work
### Q1: How to handle supersession chains across shards?
**Problem:** Assertion A1 on shard X supersedes assertion A0 on shard Y. How to ensure ordering?
**Option A:** Require supersession to happen on same shard (reject if subjects differ)
**Option B:** Use HLC timestamps, accept eventual consistency for cross-shard supersession
**Option C:** Introduce lightweight cross-shard coordination (2PC only for supersession)
**Recommendation:** Start with Option B (eventual consistency). Most supersession chains will be same subject (same shard).
### Q2: How to query across multiple shards efficiently?
**Problem:** Query "all assertions about subjects in category X" may span shards.
**Option A:** Scatter-gather (query all shards, merge results)
**Option B:** Secondary index shard (index by predicate or category)
**Option C:** Materialized views pre-aggregated by common queries
**Recommendation:** Implement Option A first, optimize hot queries with Option C.
### Q3: What's the story for multi-region deployments?
**Problem:** Replicate across US-East, US-West, EU for low-latency reads globally.
**Option A:** Single Raft group across regions (high write latency)
**Option B:** Regional Raft groups with async replication (eventual consistency)
**Option C:** Spanner-style Paxos with commit-wait (requires TrueTime)
**Recommendation:** Start single-region, then explore Option B (similar to CockroachDB follower reads).
### Q4: How to handle hot shards?
**Problem:** Celebrity subject (e.g., "Donald Trump") receives 100K writes/sec.
**Option A:** Over-provision that shard (more CPU/disk)
**Option B:** Sub-shard by predicate (split "Donald Trump" + "nationality" vs "Donald Trump" + "age")
**Option C:** Use consistent hashing to split hot subject across multiple sub-shards
**Recommendation:** Monitor for hotspots, implement Option A initially, Option C if needed.
---
## 14. Recommended Reading
### Essential Papers
1. [Raft Consensus Algorithm](https://raft.github.io/raft.pdf) - Diego Ongaro & John Ousterhout
2. [Logical Physical Clocks (HLC)](https://cse.buffalo.edu/~demirbas/publications/hlc.pdf) - Kulkarni & Demirbas
3. [Spanner: Google's Globally-Distributed Database](https://research.google.com/archive/spanner-osdi2012.pdf) - Corbett et al.
### Blog Posts
4. [Life of a CockroachDB Transaction](https://www.cockroachlabs.com/blog/parallel-commits/)
5. [Building a Distributed Log from Scratch](https://bravenewgeek.com/building-a-distributed-log-from-scratch-part-2-data-replication/)
### Books
6. **Designing Data-Intensive Applications** - Martin Kleppmann (Chapters 5, 7, 9)
---
## Conclusion
**For Episteme's append-only, content-addressed knowledge graph, we can build a significantly simpler distributed write path than traditional databases:**
1. **Use HLC instead of TrueTime** - No specialized hardware needed
2. **Single-writer-per-partition** - Eliminates distributed transactions
3. **Raft for replication** - Simple, battle-tested consensus
4. **Hash-based sharding by subject** - Natural data locality
5. **Async index updates** - Decouple write path from read optimization
**Expected outcome:** A system that scales to millions of concurrent writers, handles petabytes of assertions, and maintains strong consistency guaranteeswith ~1/5th the complexity of CockroachDB.
**Next steps:** Implement Phase 1 (HLC integration) to validate timestamp generation and causality tracking before investing in Raft infrastructure.
---
## Sources
### Google Spanner
- [Spanner: TrueTime and external consistency | Google Cloud Documentation](https://docs.cloud.google.com/spanner/docs/true-time-external-consistency)
- [Strict Serializability and External Consistency in Spanner | Google Cloud Blog](https://cloud.google.com/blog/products/databases/strict-serializability-and-external-consistency-in-spanner)
- [Life of Spanner Reads & Writes | Google Cloud Documentation](https://docs.cloud.google.com/spanner/docs/whitepapers/life-of-reads-and-writes)
### CockroachDB
- [Parallel Commits: An atomic commit protocol for globally distributed transactions](https://www.cockroachlabs.com/blog/parallel-commits/)
- [How Pipelining consensus writes speeds up distributed SQL transactions](https://www.cockroachlabs.com/blog/transaction-pipelining/)
- [Replication Layer | CockroachDB Documentation](https://www.cockroachlabs.com/docs/stable/architecture/replication-layer)
- [Life of a Distributed Transaction | CockroachDB Documentation](https://www.cockroachlabs.com/docs/stable/architecture/life-of-a-distributed-transaction)
### Hybrid Logical Clocks
- [Hybrid Logical Clock in Rust | lib.rs](https://lib.rs/crates/hybrid-logical-clock)
- [Hybrid Logical Clocks | Kevin Sookocheff](https://sookocheff.com/post/time/hybrid-logical-clocks/)
- [uhlc-rs: A Unique Hybrid Logical Clock for Rust | GitHub](https://github.com/atolab/uhlc-rs)
- [Logical Physical Clocks Paper (PDF)](https://cse.buffalo.edu/~demirbas/publications/hlc.pdf)
- [Evolving Clock Sync in Distributed Databases | YugabyteDB](https://www.yugabyte.com/blog/evolving-clock-sync-for-distributed-databases/)
### Append-Only Systems
- [Append-Only Logs: The Immutable Diary of Data | Medium](https://medium.com/@komalshehzadi/append-only-logs-the-immutable-diary-of-data-58c36a871c7c)
- [Immutability Changes Everything | ACM Queue](https://queue.acm.org/detail.cfm?id=2884038)
- [The Rise of Immutable Data Stores | ODBMS.org](https://www.odbms.org/2015/10/the-rise-of-immutable-data-stores/)
- [Data Replication in Distributed Systems | Candost's Blog](https://candost.blog/books/data-replication-in-distributed-systems/)
### Sharding
- [System Design Concepts: Database Sharding Strategies](https://engineeringatscale.substack.com/p/system-design-concepts-dive-deep)
- [Sharding strategies: directory-based, range-based, and hash-based | PlanetScale](https://planetscale.com/blog/types-of-sharding)
- [The Ultimate Guide to Consistent Hashing | Toptal](https://www.toptal.com/big-data/consistent-hashing)
- [4 Data Sharding Strategies We Analyzed When Building YugabyteDB](https://www.yugabyte.com/blog/four-data-sharding-strategies-we-analyzed-in-building-a-distributed-sql-database/)
- [RDF Triple Store Architectures (PDF)](https://www.iaria.org/conferences2018/filesDBKDA18/IztokSavnik_Tutorial_3store-arch.pdf)
### Raft & Consensus
- [Raft Consensus Algorithm](https://raft.github.io/)
- [Fast Raft: Optimizations to the Raft Consensus Protocol | arXiv](https://arxiv.org/html/2506.17793v1)
- [Write Buffering to Reduce Raft Consensus Latency in YugabyteDB](https://dev.to/yugabyte/write-buffering-to-reduce-raft-consensus-latency-in-yugabytedb-2dg6)
- [An Improved Raft Consensus Algorithm Based on Asynchronous Batch Processing | Springer](https://link.springer.com/chapter/10.1007/978-981-19-2456-9_44)
### Rust Implementations
- [tikv/raft-rs: Raft distributed consensus algorithm implemented in Rust | GitHub](https://github.com/tikv/raft-rs)
- [Implement Raft in Rust | TiKV Blog](https://tikv.org/blog/implement-raft-in-rust/)
- [raft crate | crates.io](https://crates.io/crates/raft)
### CRDTs
- [Conflict-free Replicated Data Types | CRDT Tech](https://crdt.tech/)
- [How CRDTs and Rust are revolutionizing distributed systems](https://kerkour.com/rust-crdt)
- [CRDT Implementations | CRDT Tech](https://crdt.tech/implementations)
### Kafka & Replicated Logs
- [Kafka Replication | Confluent Documentation](https://docs.confluent.io/kafka/design/replication.html)
- [Append-only Log: The core of Kafka | Medium](https://medium.com/@aryan25822/append-only-log-the-core-of-kafka-5740fc965fe7)
- [Building a Distributed Log from Scratch, Part 2 | Brave New Geek](https://bravenewgeek.com/building-a-distributed-log-from-scratch-part-2-data-replication/)
- [The Log: What every software engineer should know | LinkedIn Engineering](https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying)
### Content-Addressed Storage
- [Content Identifiers (CIDs) | IPFS Docs](https://docs.ipfs.tech/concepts/content-addressing/)
- [InterPlanetary File System | Wikipedia](https://en.wikipedia.org/wiki/InterPlanetary_File_System)
### Leader Election
- [Leader election in distributed systems | AWS Builders' Library](https://aws.amazon.com/builders-library/leader-election-in-distributed-systems/)
- [Top Leader Election Algorithms in Distributed Databases | ByteByteGo](https://blog.bytebytego.com/p/top-leader-election-algorithms-in)
- [Single Leader Replication | Medium](https://medium.com/@Omkar-Wagholikar/distributed-systems-deep-dive-single-leader-replication-0a96fb303036)
### Clock Skew
- [Clock Synchronization Is a Nightmare | Arpit Bhayani](https://arpitbhayani.me/blogs/clock-sync-nightmare/)
- [Clock Offset vs Clock Skew | Baeldung](https://www.baeldung.com/cs/clock-offset-skew-difference)