Break monolith source files into focused modules: - stemedb-core/types.rs → types/ directory (assertion, source, gold_standard, etc.) - stemedb-storage: audit_store, quota_store, trust_rank_store, vector_index, vote_store → module directories - stemedb-ingest/worker.rs → worker/ with separate test modules - stemedb-query: engine, materializer, query → module directories - stemedb-lens: epoch_aware, skeptic → module directories - stemedb-sim/lib.rs → agent, arenas/, helpers, runner, strategy, types - stemedb-api/tests: integration_tests → http_basic, http_validation, http_epoch, http_pipeline - stemedb-api/tests: e2e_flow_test → e2e_full_pipeline, e2e_lens_resolution - stemedb-query/tests: e2e_pipeline → e2e_pipeline + e2e_decay Also adds new features: gold standard verification, escalation handlers, admin endpoints, concept hierarchy spec, arena roadmap, and Go SDK. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
41 KiB
Distributed Write Path Research: CockroachDB/Spanner-Style for Episteme
Date: 2026-02-01 Status: Research Complete Target: Design simplest correct distributed write path for append-only knowledge graph
Executive Summary
After comprehensive research into Google Spanner, CockroachDB, Hybrid Logical Clocks, and distributed append-only systems, we can implement a significantly simpler architecture than traditional distributed databases because:
- Assertions are immutable - no read-write conflicts
- Content-addressed - automatic deduplication
- Read-time conflict resolution - no distributed transactions needed
- Natural partition keys - subject/predicate provide excellent sharding boundaries
Recommendation: Use Hybrid Logical Clocks (HLC) + Single-Writer-Per-Partition + Simple Raft Replication instead of full Spanner-style distributed transactions.
1. Google Spanner's Architecture
Core Components
TrueTime API
- GPS + atomic clock synchronization providing <1ms uncertainty in 99th percentile
- Returns time interval
[earliest, latest]guaranteed to contain actual time - Enables external consistency without distributed coordination
External Consistency Guarantee
- If transaction T1 completes before T2 starts committing, then
timestamp(T2) > timestamp(T1) - Achieved via commit-wait: system waits until
TT.now().earliest > commit_timestamp - Guarantees serializable transactions visible globally
Paxos Groups
- Data partitioned into ranges, each range replicated via Paxos
- Writes flow: Client → Gateway → Paxos Leader → Replicas
- Read-only transactions skip locks (use snapshot isolation)
Key Insight for Episteme: Spanner's complexity stems from mutable records requiring distributed transactions. We don't have mutable records.
Sources:
2. CockroachDB's Distributed Write Path
Architecture Layers
Client Request
↓
Transaction Coordinator (TxnCoordSender)
↓
Distribution Layer (DistSender) - routes to range leaseholder
↓
Replication Layer (Raft) - consensus among replicas
↓
Storage Layer (RocksDB) - persistent KV store
Key Optimizations
Parallel Commits
- Reduces commit latency by 50% (from 2 consensus rounds to 1)
- Introduces
STAGINGtransaction status - Writes final batch and transaction record in parallel
- Improved p50 latency by 47%, throughput by 72% in benchmarks
Transaction Pipelining
- Combines with parallel commits for near-theoretical minimum latency
- Total latency = sum(read latencies) + 1 consensus round
- Trade-off: Extra WAL write for STAGING status
MultiRaft
- Each range has own consensus group
- Nodes participate in thousands of concurrent Raft groups
- Leaseholder serves reads, coordinates writes
DistSender Intelligence
- Batches requests to same range
- Routes to current leaseholder (cached)
- Handles range splits/merges transparently
Key Insight for Episteme: CockroachDB's complexity handles multi-statement ACID transactions across ranges. Single assertion writes are much simpler.
Sources:
- Parallel Commits Protocol
- Transaction Pipelining
- Replication Layer Documentation
- Life of a Distributed Transaction
3. Hybrid Logical Clocks (HLC)
Why HLC Instead of TrueTime
TrueTime requires GPS/atomic clocks (expensive infrastructure). HLC achieves similar guarantees using NTP + logical counters.
HLC Structure
struct HLC {
physical: u64, // Wall clock time (NTP-synchronized)
logical: u64, // Counter for same physical timestamp
node_id: u64, // Tie-breaker for total ordering
}
Properties
- Causality Preservation: If event A happens-before event B, then
HLC(A) < HLC(B) - Close to Physical Time: Physical component stays within NTP bounds (~10-100ms typically)
- Monotonic: Timestamps never decrease, even with clock skew
- No Clock Updates: HLC reads but never modifies system clock
How It Works
// On local event
fn generate_timestamp() -> HLC {
let now = system_time();
if now > last_hlc.physical {
HLC { physical: now, logical: 0, node_id }
} else {
HLC { physical: last_hlc.physical, logical: last_hlc.logical + 1, node_id }
}
}
// On receiving message with remote timestamp
fn update_on_receive(remote_hlc: HLC) -> HLC {
let now = system_time();
let physical = max(now, remote_hlc.physical, last_hlc.physical);
let logical = if physical == last_hlc.physical {
last_hlc.logical + 1
} else if physical == remote_hlc.physical {
remote_hlc.logical + 1
} else {
0
};
HLC { physical, logical, node_id }
}
Clock Skew Handling
- CockroachDB default max offset: 500ms
- Nodes self-evict if skew exceeds threshold
- NTP typically achieves <10ms on LAN, <100ms on WAN
Rust Implementations
-
uhlc-rs (atolab/uhlc-rs)
- Production-ready, used in distributed systems
- Unique identifier per clock
- Monotonic timestamps close to physical time
-
hlc-rs (tbg/hlc-rs)
- Simpler implementation
- Good starting point
-
hlc-gen
- Lock-free implementation
#[no_std]compatible- High throughput
Recommendation: Start with uhlc-rs - battle-tested and well-documented.
Sources:
4. The Append-Only Advantage
What We Skip
No Distributed Transactions
- Single assertion write = single consensus operation
- No two-phase commit
- No transaction coordinator
- No deadlock detection
No Read-Write Conflicts
- Assertions never mutate
- No need for pessimistic locking
- No MVCC tombstones
- Reads never block writes
No Transaction Isolation Complexity
- No serializable snapshot isolation
- No read-write conflict tracking
- No transaction retry logic
Automatic Deduplication
- Content addressing: identical assertions have same ID
- Write is idempotent:
store(hash(content), content) - Replaying writes is safe
What We Keep
Ordering Guarantees
- Supersession chains require causal ordering
- HLC provides total ordering across cluster
- Example: Assertion A2 supersedes A1, must ensure
timestamp(A2) > timestamp(A1)
Durability
- Still need WAL + fsync
- Still need replication for fault tolerance
- Still need crash recovery
Consistency
- Raft consensus ensures replicas agree
- Leader election on failure
- No split-brain scenarios
Comparison: Full Spanner vs Episteme
| Feature | Spanner/CockroachDB | Episteme (Append-Only) |
|---|---|---|
| Write path | TxnCoordSender → DistSender → Raft | Direct → Raft |
| Transaction coordinator | Required | Not needed |
| Distributed locks | Yes (pessimistic) | No locks needed |
| Two-phase commit | Yes | No |
| Intent resolution | Complex | Not applicable |
| Read-write conflicts | Must detect and resolve | Cannot occur |
| Write amplification | High (intents + resolution) | Low (direct append) |
| Latency | 2-3 network hops | 1 network hop |
Key Insight: Immutability eliminates entire classes of distributed systems problems.
Sources:
- Append-Only Logs
- Immutability Changes Everything (ACM)
- The Rise of Immutable Data Stores
- Data Replication in Distributed Systems
5. Sharding Strategy
Options Analysis
5.1 Hash-Based Sharding (by Assertion ID)
Shard = BLAKE3(assertion_content) % num_shards
Pros:
- Uniform distribution
- No hotspots
- Simple implementation
Cons:
- Related assertions (same subject) scattered across shards
- Query "all claims about subject X" requires scatter-gather
- No data locality
Verdict: ❌ Poor fit for knowledge graph queries
5.2 Range-Based Sharding (by Subject)
Shard_A: subjects [aaa, bbb)
Shard_B: subjects [bbb, ccc)
Shard_C: subjects [ccc, zzz]
Pros:
- All assertions about subject X on same shard
- Range queries efficient
- Good for "tell me everything about X"
Cons:
- Hotspot subjects (e.g., "USA", "COVID-19")
- Manual range assignment
- Complex rebalancing
Verdict: ⚠️ Good for queries, problematic for hot subjects
5.3 Hash-Based by Subject (Recommended)
Shard = hash(subject_uri) % num_shards
Pros:
- All assertions about subject X on same shard
- Automatic load distribution
- No manual range management
- Simple shard assignment
Cons:
- Predicate-based queries still scatter-gather
- Rebalancing requires re-hashing (use consistent hashing)
Verdict: ✅ Best balance for Episteme
5.4 Composite: Subject + Predicate Hash
Shard = hash(subject_uri || predicate_uri) % num_shards
Pros:
- Even more specific data locality
- "Subject X with predicate P" queries hit single shard
Cons:
- "All claims about subject X" now scatters
- More complex query routing
Verdict: ⚠️ Only if predicate-specific queries dominate workload
Recommended Strategy
Primary: Hash-based sharding by Subject
fn assign_shard(assertion: &Assertion, num_shards: u32) -> u32 {
let hash = blake3::hash(assertion.subject.as_bytes());
u32::from_le_bytes(hash.as_bytes()[0..4].try_into().unwrap()) % num_shards
}
With Consistent Hashing for Rebalancing:
Use jump hash or consistent hash ring to minimize data movement when adding/removing shards.
// Jump hash provides minimal key redistribution
fn jump_hash(key: u64, num_buckets: u32) -> u32 {
let mut b = -1i64;
let mut j = 0i64;
let mut k = key;
while j < num_buckets as i64 {
b = j;
k = k.wrapping_mul(2862933555777941757).wrapping_add(1);
j = ((b.wrapping_add(1)) as f64 * (f64::powi(2.0, 31) / ((k >> 33).wrapping_add(1)) as f64)) as i64;
}
b as u32
}
Sources:
- Database Sharding Strategies
- Sharding Types (PlanetScale)
- Consistent Hashing Guide
- YugabyteDB Sharding Analysis
- RDF Triple Store Partitioning
6. Write Amplification and Performance
Raft Write Overhead
Standard Raft Write Path:
- Client sends write to leader
- Leader appends to local WAL (fsync)
- Leader sends AppendEntries RPC to followers
- Followers append to local WAL (fsync)
- Followers respond to leader
- Leader commits when quorum reached
- Leader responds to client
Cost Analysis:
- Network: 1 round-trip (leader ↔ followers)
- Disk: 1 fsync at leader + (replication_factor - 1) fsyncs at followers
- Write Amplification:
replication_factor(typically 3x-5x)
Batching Optimizations
Batch AppendEntries: Instead of sending RPC per write, batch N writes into single RPC.
// Naive: 1000 writes = 1000 RPCs
for write in writes {
leader.append_entries(vec![write]).await?;
}
// Optimized: 1000 writes = 1 RPC
leader.append_entries(writes).await?;
Performance Impact (from YugabyteDB):
- Throughput: 2x improvement with batching
- Latency: 30% reduction for p99
Write Buffering: Buffer writes in memory, flush when:
- Buffer reaches size threshold (e.g., 1MB)
- Time threshold reached (e.g., 10ms)
- Explicit flush request
Trade-off: Latency vs throughput
- Small batches (10ms): Lower latency, moderate throughput
- Large batches (100ms): Higher latency, maximum throughput
Pipeline Optimization
CockroachDB's Parallel Commits Approach:
Standard 2-phase commit:
Write intents → Commit record → Resolve intents
(1 RTT) (1 RTT) (async)
Total: 2 RTT
Parallel commits:
Write intents + Commit record (parallel)
(1 RTT)
Total: 1 RTT
Episteme Equivalent: We don't have intents, but we can pipeline:
// Sequential: 2 consensus rounds
let wal_entry = wal.append(assertion).await?; // Round 1
let index_update = index.update(assertion).await?; // Round 2
// Pipelined: 1 consensus round
let (wal_entry, index_update) = tokio::join!(
wal.append(assertion),
index.update(assertion),
);
Expected Performance Numbers
Local Write (Single Node):
- Latency: 1-5ms (dominated by fsync)
- Throughput: 10K-50K writes/sec (NVMe SSD)
Distributed Write (3 Replicas, Same DC):
- Latency: 5-15ms (1 RTT + fsync)
- Throughput: 5K-20K writes/sec per shard
Distributed Write (3 Replicas, Multi-Region):
- Latency: 50-200ms (depends on geography)
- Throughput: 1K-5K writes/sec per shard
Batched Writes (100 assertions/batch):
- Throughput: 100K-500K assertions/sec per shard
With 100 Shards:
- Aggregate throughput: 10M-50M assertions/sec
Sources:
- Raft Consensus Algorithm
- Fast Raft Optimizations
- YugabyteDB Write Buffering
- Improved Raft with Batch Processing
7. Concrete Architecture Proposal
The Simplest Correct Design
┌─────────────────────────────────────────────────────────┐
│ Agent (Writer) │
│ │
│ 1. Generate Assertion │
│ 2. Sign with Ed25519 │
│ 3. Compute Content Hash (BLAKE3) │
└────────────────┬────────────────────────────────────────┘
│ HTTP POST /assertions
▼
┌─────────────────────────────────────────────────────────┐
│ Gateway Node (Any) │
│ │
│ 1. Validate signature │
│ 2. Compute shard: hash(subject) % num_shards │
│ 3. Route to shard leader │
└────────────────┬────────────────────────────────────────┘
│ gRPC → Shard Leader
▼
┌─────────────────────────────────────────────────────────┐
│ Shard N (3-5 Replicas) │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Leader Node │ │
│ │ │ │
│ │ 1. Generate HLC timestamp │ │
│ │ 2. Append to WAL (fsync) │ │
│ │ 3. Propose to Raft group │◄──────────┤── Raft RPCs
│ │ 4. Wait for quorum │ │
│ │ 5. Apply to KV store │ │
│ │ 6. Update indexes (async) │ │
│ │ 7. Respond to gateway │ │
│ └──────────────────────────────────────────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Follower 1 │ │ Follower 2 │ │ Follower 3 │ │
│ │ │ │ │ │ │ │
│ │ - Replicate │ │ - Replicate │ │ - Replicate │ │
│ │ - Apply │ │ - Apply │ │ - Apply │ │
│ │ - Ready for │ │ - Ready for │ │ - Ready for │ │
│ │ failover │ │ failover │ │ failover │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
Component Breakdown
7.1 Gateway Layer (Stateless)
Responsibilities:
- Accept HTTP/gRPC requests
- Authenticate agents (verify Ed25519 signatures)
- Route requests to correct shard
- Cache shard topology
- Return responses to clients
Implementation:
use axum::{Router, routing::post};
use blake3::Hash;
struct Gateway {
shard_map: Arc<RwLock<ShardMap>>,
http_client: reqwest::Client,
}
async fn handle_assertion(
State(gateway): State<Gateway>,
Json(assertion): Json<Assertion>,
) -> Result<StatusCode, ApiError> {
// 1. Validate signature
assertion.verify_signature()?;
// 2. Compute shard
let shard_id = compute_shard(&assertion.subject, gateway.shard_map.read().await.num_shards);
// 3. Find leader
let leader_addr = gateway.shard_map.read().await.get_leader(shard_id)?;
// 4. Forward to leader
gateway.http_client
.post(format!("{}/internal/append", leader_addr))
.json(&assertion)
.send()
.await?;
Ok(StatusCode::CREATED)
}
fn compute_shard(subject: &str, num_shards: u32) -> u32 {
let hash = blake3::hash(subject.as_bytes());
u32::from_le_bytes(hash.as_bytes()[0..4].try_into().unwrap()) % num_shards
}
Scaling: Run many gateway instances behind load balancer.
7.2 Shard Leader (Stateful)
Responsibilities:
- Accept writes for this shard's key range
- Generate HLC timestamps
- Append to WAL
- Replicate via Raft
- Apply committed entries to storage
- Update indexes
- Handle leader election
Implementation:
use uhlc::HLC;
use tikv_raft::{RawNode, Config};
struct ShardLeader {
shard_id: u32,
hlc: Arc<Mutex<HLC>>,
wal: WriteAheadLog,
kv_store: KVStore,
raft_node: RawNode<MemStorage>,
index_store: IndexStore,
}
impl ShardLeader {
async fn append_assertion(&self, assertion: Assertion) -> Result<AssertionId, StorageError> {
// 1. Generate HLC timestamp
let timestamp = self.hlc.lock().await.new_timestamp();
// 2. Create versioned assertion
let versioned = VersionedAssertion {
assertion,
timestamp,
shard_id: self.shard_id,
};
// 3. Serialize
let bytes = serialize(&versioned)?;
// 4. Propose to Raft (this does WAL append + replication)
let propose_id = self.raft_node.propose(vec![], bytes)?;
// 5. Wait for commit (via channel or polling)
self.wait_for_commit(propose_id).await?;
// 6. Apply to KV store (idempotent)
let assertion_id = AssertionId::from_hash(blake3::hash(&bytes));
self.kv_store.put(assertion_id.as_bytes(), &bytes)?;
// 7. Update indexes (async, best-effort)
tokio::spawn({
let index_store = self.index_store.clone();
let versioned = versioned.clone();
async move {
let _ = index_store.index_assertion(&versioned).await;
}
});
Ok(assertion_id)
}
}
7.3 Raft Replication (tikv/raft-rs)
Configuration:
use tikv_raft::{Config, RawNode, storage::MemStorage};
fn create_raft_node(node_id: u64, peers: Vec<u64>) -> RawNode<MemStorage> {
let config = Config {
id: node_id,
election_tick: 10,
heartbeat_tick: 3,
max_size_per_msg: 1024 * 1024, // 1MB
max_inflight_msgs: 256,
..Default::default()
};
let storage = MemStorage::new();
RawNode::new(&config, storage, peers).unwrap()
}
Raft Message Handling:
async fn run_raft_loop(raft_node: Arc<Mutex<RawNode<MemStorage>>>) {
let mut ticker = tokio::time::interval(Duration::from_millis(100));
loop {
ticker.tick().await;
let mut node = raft_node.lock().await;
node.tick();
// Process ready entries
if node.has_ready() {
let ready = node.ready();
// 1. Persist entries to WAL
if !ready.entries().is_empty() {
persist_entries(ready.entries()).await;
}
// 2. Send messages to peers
for msg in ready.messages {
send_to_peer(msg).await;
}
// 3. Apply committed entries
for entry in ready.committed_entries {
apply_entry(entry).await;
}
// 4. Advance Raft state
node.advance(ready);
}
}
}
7.4 Storage Layer
Write-Ahead Log:
struct WriteAheadLog {
file: File,
current_offset: u64,
}
impl WriteAheadLog {
async fn append(&mut self, entry: &RaftEntry) -> Result<u64, io::Error> {
let bytes = serialize(entry)?;
let checksum = crc32c::crc32c(&bytes);
// Write: [len: u32][checksum: u32][data: bytes]
let record = [
&(bytes.len() as u32).to_le_bytes()[..],
&checksum.to_le_bytes()[..],
&bytes[..],
].concat();
self.file.write_all(&record).await?;
self.file.sync_all().await?; // fsync
let offset = self.current_offset;
self.current_offset += record.len() as u64;
Ok(offset)
}
}
KV Store (RocksDB):
use rocksdb::{DB, Options, WriteBatch};
struct KVStore {
db: Arc<DB>,
}
impl KVStore {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), rocksdb::Error> {
self.db.put(key, value)
}
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, rocksdb::Error> {
self.db.get(key)
}
fn batch_put(&self, entries: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), rocksdb::Error> {
let mut batch = WriteBatch::default();
for (key, value) in entries {
batch.put(key, value);
}
self.db.write(batch)
}
}
7.5 Index Store (Async Updates)
Subject Index:
struct SubjectIndex {
db: Arc<DB>,
}
impl SubjectIndex {
async fn index_assertion(&self, assertion: &VersionedAssertion) -> Result<(), StorageError> {
let subject_key = format!("subject:{}:{}", assertion.assertion.subject, assertion.timestamp);
self.db.put(subject_key.as_bytes(), assertion.assertion.id.as_bytes())?;
Ok(())
}
async fn query_subject(&self, subject: &str, limit: usize) -> Result<Vec<AssertionId>, StorageError> {
let prefix = format!("subject:{}:", subject);
let mut results = Vec::new();
let iter = self.db.prefix_iterator(prefix.as_bytes());
for (_, value) in iter.take(limit) {
results.push(AssertionId::from_bytes(&value)?);
}
Ok(results)
}
}
7.6 Failure Handling
Leader Failure:
- Followers detect missing heartbeats (election_timeout)
- Follower transitions to candidate, increments term
- Candidate requests votes from peers
- Majority votes → becomes new leader
- New leader sends heartbeats
- Gateway detects new leader (via Raft status API)
Follower Failure:
- Leader detects missing ACKs
- Leader continues with remaining quorum
- When follower recovers, catches up via log replication
Network Partition:
- Minority partition cannot elect leader (no quorum)
- Minority partition rejects writes
- Majority partition continues serving writes
- When partition heals, minority replays majority's log
Consistency Guarantee: Raft ensures linearizability (strongest consistency model).
7.7 What We Skip
No Transaction Coordinator:
- Single assertion = single Raft proposal
- No multi-shard transactions
No Intent Resolution:
- CockroachDB writes "intents" then resolves them
- We write final values directly
No Read-Write Conflict Detection:
- Assertions are immutable
- No conflict possible
No Distributed Locks:
- No lock table
- No deadlock detection
No MVCC Garbage Collection:
- Assertions never deleted
- No tombstones to clean up
8. Rust Crates to Use
Core Dependencies
[dependencies]
# Consensus
raft = "0.7" # TiKV's Raft implementation
raft-proto = "0.7" # Protobuf definitions
# Hybrid Logical Clock
uhlc = "0.5" # Battle-tested HLC
# Storage
rocksdb = "0.21" # Embedded KV store
sled = "0.34" # Alternative: pure-Rust KV store
# Serialization
rkyv = "0.7" # Zero-copy serialization (already using)
blake3 = "1.5" # Content addressing (already using)
# Async runtime
tokio = { version = "1.35", features = ["full"] }
tokio-stream = "0.1"
# Networking
tonic = "0.10" # gRPC (Raft messages)
axum = "0.7" # HTTP gateway (already using)
tower = "0.4" # Middleware
# Error handling
anyhow = "1.0"
thiserror = "1.0"
# Observability
tracing = "0.1" # Already using
tracing-subscriber = "0.3"
metrics = "0.21"
Optional (Future)
# If we need service discovery
etcd-client = "0.12"
# If we want built-in consensus
tikv-client = "0.2" # Use TiKV as storage backend
# If we need distributed tracing
opentelemetry = "0.21"
opentelemetry-jaeger = "0.20"
9. Implementation Phases
Phase 1: Single-Node with HLC (1-2 weeks)
Goal: Prove HLC integration + baseline performance
// Extend current ingestion with HLC
struct IngestionPipeline {
hlc: Arc<Mutex<HLC>>,
wal: WriteAheadLog,
kv_store: KVStore,
}
impl IngestionPipeline {
async fn ingest(&self, assertion: Assertion) -> Result<AssertionId, Error> {
let timestamp = self.hlc.lock().await.new_timestamp();
let versioned = VersionedAssertion { assertion, timestamp };
// Existing WAL append logic
self.wal.append(&versioned).await?;
// Existing KV store logic
let id = AssertionId::from_content(&versioned);
self.kv_store.put(id.as_bytes(), &serialize(&versioned)?)?;
Ok(id)
}
}
Tests:
- HLC monotonicity under concurrent writes
- HLC causality preservation (ingest A, ingest B superseding A, verify timestamps)
- Performance baseline (writes/sec, p99 latency)
Phase 2: Single-Shard with Raft Replication (3-4 weeks)
Goal: Fault-tolerant single shard (3 replicas)
Architecture:
Writer → Leader (with Raft) → Followers (2x)
Implementation:
- Wrap
IngestionPipelinewith Raft layer - Leader proposes writes to Raft group
- Followers replicate and apply
- Leader responds when committed
Tests:
- Leader failure: kill leader, verify follower elected, writes continue
- Follower failure: kill follower, verify writes continue with 2/3 quorum
- Recovery: restart failed node, verify catches up
- Split-brain: network partition, verify minority rejects writes
Phase 3: Multi-Shard with Gateway (4-6 weeks)
Goal: Horizontal scalability
Architecture:
Writers → Gateway Layer (stateless, N instances)
↓
┌─────────┼─────────┐
Shard 0 Shard 1 Shard 2
(3 replicas each)
Implementation:
- Gateway routes by
hash(subject) % num_shards - Shard topology stored in etcd or Raft metadata
- Gateway caches topology, refreshes on errors
- Dynamic shard addition (consistent hashing)
Tests:
- Load balancing: verify writes distributed evenly
- Shard failover: kill entire shard, verify gateway routes to backup
- Rebalancing: add shard, verify rehashing redistributes data
- Hotspot handling: send 10K writes to same subject, verify throughput
Phase 4: Production Hardening (ongoing)
Observability:
// Metrics
metrics::counter!("assertions_ingested_total", "shard" => shard_id.to_string());
metrics::histogram!("raft_commit_duration_ms", commit_duration.as_millis() as f64);
metrics::gauge!("raft_leader_id", leader_id as f64);
// Tracing
#[instrument(skip(self), fields(shard_id = %self.shard_id, assertion_id = %assertion.id))]
async fn append_assertion(&self, assertion: Assertion) -> Result<()> {
info!("Appending assertion");
// ...
}
Chaos Testing:
- Random node kills
- Network latency injection
- Disk failures
- Clock skew simulation
Performance Tuning:
- Batch size optimization (find sweet spot for latency vs throughput)
- RocksDB compaction tuning
- Raft message batching
- Index update batching
10. Key Design Decisions Summary
| Decision | Choice | Rationale |
|---|---|---|
| Clock | Hybrid Logical Clock (HLC) | Causality + physical time without GPS clocks |
| Consensus | Raft (tikv/raft-rs) | Simple, proven, excellent Rust implementation |
| Sharding | Hash by subject | Data locality for common queries |
| Replication | 3 replicas per shard | Tolerate 1 failure (quorum = 2/3) |
| Storage | RocksDB (via rust-rocksdb) | LSM tree, proven at scale, good Rust bindings |
| Serialization | rkyv (already using) | Zero-copy, fastest option |
| Transactions | None (single-assertion writes) | Append-only eliminates need |
| Gateway | Stateless HTTP/gRPC | Easy to scale horizontally |
11. Performance Expectations
Single Shard (3 Replicas, Same Datacenter)
| Metric | No Batching | With Batching (100/batch) |
|---|---|---|
| Write Latency (p50) | 5-10ms | 5-10ms |
| Write Latency (p99) | 15-30ms | 15-30ms |
| Throughput | 5K-20K/sec | 100K-500K/sec |
| Write Amplification | 3x (replication) | 3x |
100 Shards
| Metric | Value |
|---|---|
| Aggregate Throughput | 10M-50M assertions/sec |
| Storage Efficiency | ~70% (accounting for WAL, indexes) |
| Fault Tolerance | Lose any 1 replica per shard |
Resource Requirements (per node)
| Resource | Requirement |
|---|---|
| CPU | 8-16 cores |
| RAM | 32-64 GB |
| Disk | 1-4 TB NVMe SSD |
| Network | 10 Gbps |
12. Comparison: Traditional vs Episteme
CockroachDB Transaction Flow
1. Client: BEGIN TRANSACTION
2. TxnCoordSender: Create transaction record
3. DistSender: Route to ranges, write intents
4. Raft: Replicate intents (2 RTT)
5. TxnCoordSender: Write commit record
6. Raft: Replicate commit (1 RTT)
7. TxnCoordSender: Resolve intents (async)
8. Client: Transaction complete
Total: ~3-4 RTT, complex state machine
Episteme Assertion Flow
1. Client: POST /assertions
2. Gateway: Route to shard leader
3. Leader: Propose to Raft
4. Raft: Replicate and commit (1 RTT)
5. Leader: Apply to storage
6. Gateway: Return assertion ID
Total: ~1-2 RTT, simple state machine
Latency Reduction: 50-75% vs traditional distributed database Code Complexity: ~80% less than CockroachDB transaction coordinator
13. Open Questions & Future Work
Q1: How to handle supersession chains across shards?
Problem: Assertion A1 on shard X supersedes assertion A0 on shard Y. How to ensure ordering?
Option A: Require supersession to happen on same shard (reject if subjects differ) Option B: Use HLC timestamps, accept eventual consistency for cross-shard supersession Option C: Introduce lightweight cross-shard coordination (2PC only for supersession)
Recommendation: Start with Option B (eventual consistency). Most supersession chains will be same subject (same shard).
Q2: How to query across multiple shards efficiently?
Problem: Query "all assertions about subjects in category X" may span shards.
Option A: Scatter-gather (query all shards, merge results) Option B: Secondary index shard (index by predicate or category) Option C: Materialized views pre-aggregated by common queries
Recommendation: Implement Option A first, optimize hot queries with Option C.
Q3: What's the story for multi-region deployments?
Problem: Replicate across US-East, US-West, EU for low-latency reads globally.
Option A: Single Raft group across regions (high write latency) Option B: Regional Raft groups with async replication (eventual consistency) Option C: Spanner-style Paxos with commit-wait (requires TrueTime)
Recommendation: Start single-region, then explore Option B (similar to CockroachDB follower reads).
Q4: How to handle hot shards?
Problem: Celebrity subject (e.g., "Donald Trump") receives 100K writes/sec.
Option A: Over-provision that shard (more CPU/disk) Option B: Sub-shard by predicate (split "Donald Trump" + "nationality" vs "Donald Trump" + "age") Option C: Use consistent hashing to split hot subject across multiple sub-shards
Recommendation: Monitor for hotspots, implement Option A initially, Option C if needed.
14. Recommended Reading
Essential Papers
- Raft Consensus Algorithm - Diego Ongaro & John Ousterhout
- Logical Physical Clocks (HLC) - Kulkarni & Demirbas
- Spanner: Google's Globally-Distributed Database - Corbett et al.
Blog Posts
Books
- Designing Data-Intensive Applications - Martin Kleppmann (Chapters 5, 7, 9)
Conclusion
For Episteme's append-only, content-addressed knowledge graph, we can build a significantly simpler distributed write path than traditional databases:
- Use HLC instead of TrueTime - No specialized hardware needed
- Single-writer-per-partition - Eliminates distributed transactions
- Raft for replication - Simple, battle-tested consensus
- Hash-based sharding by subject - Natural data locality
- Async index updates - Decouple write path from read optimization
Expected outcome: A system that scales to millions of concurrent writers, handles petabytes of assertions, and maintains strong consistency guarantees—with ~1/5th the complexity of CockroachDB.
Next steps: Implement Phase 1 (HLC integration) to validate timestamp generation and causality tracking before investing in Raft infrastructure.
Sources
Google Spanner
- Spanner: TrueTime and external consistency | Google Cloud Documentation
- Strict Serializability and External Consistency in Spanner | Google Cloud Blog
- Life of Spanner Reads & Writes | Google Cloud Documentation
CockroachDB
- Parallel Commits: An atomic commit protocol for globally distributed transactions
- How Pipelining consensus writes speeds up distributed SQL transactions
- Replication Layer | CockroachDB Documentation
- Life of a Distributed Transaction | CockroachDB Documentation
Hybrid Logical Clocks
- Hybrid Logical Clock in Rust | lib.rs
- Hybrid Logical Clocks | Kevin Sookocheff
- uhlc-rs: A Unique Hybrid Logical Clock for Rust | GitHub
- Logical Physical Clocks Paper (PDF)
- Evolving Clock Sync in Distributed Databases | YugabyteDB
Append-Only Systems
- Append-Only Logs: The Immutable Diary of Data | Medium
- Immutability Changes Everything | ACM Queue
- The Rise of Immutable Data Stores | ODBMS.org
- Data Replication in Distributed Systems | Candost's Blog
Sharding
- System Design Concepts: Database Sharding Strategies
- Sharding strategies: directory-based, range-based, and hash-based | PlanetScale
- The Ultimate Guide to Consistent Hashing | Toptal
- 4 Data Sharding Strategies We Analyzed When Building YugabyteDB
- RDF Triple Store Architectures (PDF)
Raft & Consensus
- Raft Consensus Algorithm
- Fast Raft: Optimizations to the Raft Consensus Protocol | arXiv
- Write Buffering to Reduce Raft Consensus Latency in YugabyteDB
- An Improved Raft Consensus Algorithm Based on Asynchronous Batch Processing | Springer
Rust Implementations
- tikv/raft-rs: Raft distributed consensus algorithm implemented in Rust | GitHub
- Implement Raft in Rust | TiKV Blog
- raft crate | crates.io
CRDTs
- Conflict-free Replicated Data Types | CRDT Tech
- How CRDTs and Rust are revolutionizing distributed systems
- CRDT Implementations | CRDT Tech
Kafka & Replicated Logs
- Kafka Replication | Confluent Documentation
- Append-only Log: The core of Kafka | Medium
- Building a Distributed Log from Scratch, Part 2 | Brave New Geek
- The Log: What every software engineer should know | LinkedIn Engineering
Content-Addressed Storage
Leader Election
- Leader election in distributed systems | AWS Builders' Library
- Top Leader Election Algorithms in Distributed Databases | ByteByteGo
- Single Leader Replication | Medium