stemedb/docs/research/distributed-write-path.md
jordan 55349845d0 refactor: Split all files to enforce 500-line max
Break monolith source files into focused modules:
- stemedb-core/types.rs → types/ directory (assertion, source, gold_standard, etc.)
- stemedb-storage: audit_store, quota_store, trust_rank_store, vector_index, vote_store → module directories
- stemedb-ingest/worker.rs → worker/ with separate test modules
- stemedb-query: engine, materializer, query → module directories
- stemedb-lens: epoch_aware, skeptic → module directories
- stemedb-sim/lib.rs → agent, arenas/, helpers, runner, strategy, types
- stemedb-api/tests: integration_tests → http_basic, http_validation, http_epoch, http_pipeline
- stemedb-api/tests: e2e_flow_test → e2e_full_pipeline, e2e_lens_resolution
- stemedb-query/tests: e2e_pipeline → e2e_pipeline + e2e_decay

Also adds new features: gold standard verification, escalation handlers,
admin endpoints, concept hierarchy spec, arena roadmap, and Go SDK.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 01:13:45 -07:00

41 KiB

Distributed Write Path Research: CockroachDB/Spanner-Style for Episteme

Date: 2026-02-01 Status: Research Complete Target: Design simplest correct distributed write path for append-only knowledge graph


Executive Summary

After comprehensive research into Google Spanner, CockroachDB, Hybrid Logical Clocks, and distributed append-only systems, we can implement a significantly simpler architecture than traditional distributed databases because:

  1. Assertions are immutable - no read-write conflicts
  2. Content-addressed - automatic deduplication
  3. Read-time conflict resolution - no distributed transactions needed
  4. Natural partition keys - subject/predicate provide excellent sharding boundaries

Recommendation: Use Hybrid Logical Clocks (HLC) + Single-Writer-Per-Partition + Simple Raft Replication instead of full Spanner-style distributed transactions.


1. Google Spanner's Architecture

Core Components

TrueTime API

  • GPS + atomic clock synchronization providing <1ms uncertainty in 99th percentile
  • Returns time interval [earliest, latest] guaranteed to contain actual time
  • Enables external consistency without distributed coordination

External Consistency Guarantee

  • If transaction T1 completes before T2 starts committing, then timestamp(T2) > timestamp(T1)
  • Achieved via commit-wait: system waits until TT.now().earliest > commit_timestamp
  • Guarantees serializable transactions visible globally

Paxos Groups

  • Data partitioned into ranges, each range replicated via Paxos
  • Writes flow: Client → Gateway → Paxos Leader → Replicas
  • Read-only transactions skip locks (use snapshot isolation)

Key Insight for Episteme: Spanner's complexity stems from mutable records requiring distributed transactions. We don't have mutable records.

Sources:


2. CockroachDB's Distributed Write Path

Architecture Layers

Client Request
    ↓
Transaction Coordinator (TxnCoordSender)
    ↓
Distribution Layer (DistSender) - routes to range leaseholder
    ↓
Replication Layer (Raft) - consensus among replicas
    ↓
Storage Layer (RocksDB) - persistent KV store

Key Optimizations

Parallel Commits

  • Reduces commit latency by 50% (from 2 consensus rounds to 1)
  • Introduces STAGING transaction status
  • Writes final batch and transaction record in parallel
  • Improved p50 latency by 47%, throughput by 72% in benchmarks

Transaction Pipelining

  • Combines with parallel commits for near-theoretical minimum latency
  • Total latency = sum(read latencies) + 1 consensus round
  • Trade-off: Extra WAL write for STAGING status

MultiRaft

  • Each range has own consensus group
  • Nodes participate in thousands of concurrent Raft groups
  • Leaseholder serves reads, coordinates writes

DistSender Intelligence

  • Batches requests to same range
  • Routes to current leaseholder (cached)
  • Handles range splits/merges transparently

Key Insight for Episteme: CockroachDB's complexity handles multi-statement ACID transactions across ranges. Single assertion writes are much simpler.

Sources:


3. Hybrid Logical Clocks (HLC)

Why HLC Instead of TrueTime

TrueTime requires GPS/atomic clocks (expensive infrastructure). HLC achieves similar guarantees using NTP + logical counters.

HLC Structure

struct HLC {
    physical: u64,  // Wall clock time (NTP-synchronized)
    logical: u64,   // Counter for same physical timestamp
    node_id: u64,   // Tie-breaker for total ordering
}

Properties

  1. Causality Preservation: If event A happens-before event B, then HLC(A) < HLC(B)
  2. Close to Physical Time: Physical component stays within NTP bounds (~10-100ms typically)
  3. Monotonic: Timestamps never decrease, even with clock skew
  4. No Clock Updates: HLC reads but never modifies system clock

How It Works

// On local event
fn generate_timestamp() -> HLC {
    let now = system_time();
    if now > last_hlc.physical {
        HLC { physical: now, logical: 0, node_id }
    } else {
        HLC { physical: last_hlc.physical, logical: last_hlc.logical + 1, node_id }
    }
}

// On receiving message with remote timestamp
fn update_on_receive(remote_hlc: HLC) -> HLC {
    let now = system_time();
    let physical = max(now, remote_hlc.physical, last_hlc.physical);
    let logical = if physical == last_hlc.physical {
        last_hlc.logical + 1
    } else if physical == remote_hlc.physical {
        remote_hlc.logical + 1
    } else {
        0
    };
    HLC { physical, logical, node_id }
}

Clock Skew Handling

  • CockroachDB default max offset: 500ms
  • Nodes self-evict if skew exceeds threshold
  • NTP typically achieves <10ms on LAN, <100ms on WAN

Rust Implementations

  1. uhlc-rs (atolab/uhlc-rs)

    • Production-ready, used in distributed systems
    • Unique identifier per clock
    • Monotonic timestamps close to physical time
  2. hlc-rs (tbg/hlc-rs)

    • Simpler implementation
    • Good starting point
  3. hlc-gen

    • Lock-free implementation
    • #[no_std] compatible
    • High throughput

Recommendation: Start with uhlc-rs - battle-tested and well-documented.

Sources:


4. The Append-Only Advantage

What We Skip

No Distributed Transactions

  • Single assertion write = single consensus operation
  • No two-phase commit
  • No transaction coordinator
  • No deadlock detection

No Read-Write Conflicts

  • Assertions never mutate
  • No need for pessimistic locking
  • No MVCC tombstones
  • Reads never block writes

No Transaction Isolation Complexity

  • No serializable snapshot isolation
  • No read-write conflict tracking
  • No transaction retry logic

Automatic Deduplication

  • Content addressing: identical assertions have same ID
  • Write is idempotent: store(hash(content), content)
  • Replaying writes is safe

What We Keep

Ordering Guarantees

  • Supersession chains require causal ordering
  • HLC provides total ordering across cluster
  • Example: Assertion A2 supersedes A1, must ensure timestamp(A2) > timestamp(A1)

Durability

  • Still need WAL + fsync
  • Still need replication for fault tolerance
  • Still need crash recovery

Consistency

  • Raft consensus ensures replicas agree
  • Leader election on failure
  • No split-brain scenarios

Comparison: Full Spanner vs Episteme

Feature Spanner/CockroachDB Episteme (Append-Only)
Write path TxnCoordSender → DistSender → Raft Direct → Raft
Transaction coordinator Required Not needed
Distributed locks Yes (pessimistic) No locks needed
Two-phase commit Yes No
Intent resolution Complex Not applicable
Read-write conflicts Must detect and resolve Cannot occur
Write amplification High (intents + resolution) Low (direct append)
Latency 2-3 network hops 1 network hop

Key Insight: Immutability eliminates entire classes of distributed systems problems.

Sources:


5. Sharding Strategy

Options Analysis

5.1 Hash-Based Sharding (by Assertion ID)

Shard = BLAKE3(assertion_content) % num_shards

Pros:

  • Uniform distribution
  • No hotspots
  • Simple implementation

Cons:

  • Related assertions (same subject) scattered across shards
  • Query "all claims about subject X" requires scatter-gather
  • No data locality

Verdict: Poor fit for knowledge graph queries


5.2 Range-Based Sharding (by Subject)

Shard_A: subjects [aaa, bbb)
Shard_B: subjects [bbb, ccc)
Shard_C: subjects [ccc, zzz]

Pros:

  • All assertions about subject X on same shard
  • Range queries efficient
  • Good for "tell me everything about X"

Cons:

  • Hotspot subjects (e.g., "USA", "COVID-19")
  • Manual range assignment
  • Complex rebalancing

Verdict: ⚠️ Good for queries, problematic for hot subjects


Shard = hash(subject_uri) % num_shards

Pros:

  • All assertions about subject X on same shard
  • Automatic load distribution
  • No manual range management
  • Simple shard assignment

Cons:

  • Predicate-based queries still scatter-gather
  • Rebalancing requires re-hashing (use consistent hashing)

Verdict: Best balance for Episteme


5.4 Composite: Subject + Predicate Hash

Shard = hash(subject_uri || predicate_uri) % num_shards

Pros:

  • Even more specific data locality
  • "Subject X with predicate P" queries hit single shard

Cons:

  • "All claims about subject X" now scatters
  • More complex query routing

Verdict: ⚠️ Only if predicate-specific queries dominate workload


Primary: Hash-based sharding by Subject

fn assign_shard(assertion: &Assertion, num_shards: u32) -> u32 {
    let hash = blake3::hash(assertion.subject.as_bytes());
    u32::from_le_bytes(hash.as_bytes()[0..4].try_into().unwrap()) % num_shards
}

With Consistent Hashing for Rebalancing:

Use jump hash or consistent hash ring to minimize data movement when adding/removing shards.

// Jump hash provides minimal key redistribution
fn jump_hash(key: u64, num_buckets: u32) -> u32 {
    let mut b = -1i64;
    let mut j = 0i64;
    let mut k = key;
    while j < num_buckets as i64 {
        b = j;
        k = k.wrapping_mul(2862933555777941757).wrapping_add(1);
        j = ((b.wrapping_add(1)) as f64 * (f64::powi(2.0, 31) / ((k >> 33).wrapping_add(1)) as f64)) as i64;
    }
    b as u32
}

Sources:


6. Write Amplification and Performance

Raft Write Overhead

Standard Raft Write Path:

  1. Client sends write to leader
  2. Leader appends to local WAL (fsync)
  3. Leader sends AppendEntries RPC to followers
  4. Followers append to local WAL (fsync)
  5. Followers respond to leader
  6. Leader commits when quorum reached
  7. Leader responds to client

Cost Analysis:

  • Network: 1 round-trip (leader ↔ followers)
  • Disk: 1 fsync at leader + (replication_factor - 1) fsyncs at followers
  • Write Amplification: replication_factor (typically 3x-5x)

Batching Optimizations

Batch AppendEntries: Instead of sending RPC per write, batch N writes into single RPC.

// Naive: 1000 writes = 1000 RPCs
for write in writes {
    leader.append_entries(vec![write]).await?;
}

// Optimized: 1000 writes = 1 RPC
leader.append_entries(writes).await?;

Performance Impact (from YugabyteDB):

  • Throughput: 2x improvement with batching
  • Latency: 30% reduction for p99

Write Buffering: Buffer writes in memory, flush when:

  • Buffer reaches size threshold (e.g., 1MB)
  • Time threshold reached (e.g., 10ms)
  • Explicit flush request

Trade-off: Latency vs throughput

  • Small batches (10ms): Lower latency, moderate throughput
  • Large batches (100ms): Higher latency, maximum throughput

Pipeline Optimization

CockroachDB's Parallel Commits Approach:

Standard 2-phase commit:

Write intents → Commit record → Resolve intents
   (1 RTT)         (1 RTT)         (async)
Total: 2 RTT

Parallel commits:

Write intents + Commit record (parallel)
   (1 RTT)
Total: 1 RTT

Episteme Equivalent: We don't have intents, but we can pipeline:

// Sequential: 2 consensus rounds
let wal_entry = wal.append(assertion).await?;  // Round 1
let index_update = index.update(assertion).await?;  // Round 2

// Pipelined: 1 consensus round
let (wal_entry, index_update) = tokio::join!(
    wal.append(assertion),
    index.update(assertion),
);

Expected Performance Numbers

Local Write (Single Node):

  • Latency: 1-5ms (dominated by fsync)
  • Throughput: 10K-50K writes/sec (NVMe SSD)

Distributed Write (3 Replicas, Same DC):

  • Latency: 5-15ms (1 RTT + fsync)
  • Throughput: 5K-20K writes/sec per shard

Distributed Write (3 Replicas, Multi-Region):

  • Latency: 50-200ms (depends on geography)
  • Throughput: 1K-5K writes/sec per shard

Batched Writes (100 assertions/batch):

  • Throughput: 100K-500K assertions/sec per shard

With 100 Shards:

  • Aggregate throughput: 10M-50M assertions/sec

Sources:


7. Concrete Architecture Proposal

The Simplest Correct Design

┌─────────────────────────────────────────────────────────┐
│                     Agent (Writer)                       │
│                                                          │
│  1. Generate Assertion                                   │
│  2. Sign with Ed25519                                    │
│  3. Compute Content Hash (BLAKE3)                        │
└────────────────┬────────────────────────────────────────┘
                 │ HTTP POST /assertions
                 ▼
┌─────────────────────────────────────────────────────────┐
│                   Gateway Node (Any)                     │
│                                                          │
│  1. Validate signature                                   │
│  2. Compute shard: hash(subject) % num_shards            │
│  3. Route to shard leader                                │
└────────────────┬────────────────────────────────────────┘
                 │ gRPC → Shard Leader
                 ▼
┌─────────────────────────────────────────────────────────┐
│               Shard N (3-5 Replicas)                     │
│                                                          │
│  ┌──────────────────────────────────────────┐           │
│  │           Leader Node                    │           │
│  │                                          │           │
│  │  1. Generate HLC timestamp               │           │
│  │  2. Append to WAL (fsync)                │           │
│  │  3. Propose to Raft group                │◄──────────┤── Raft RPCs
│  │  4. Wait for quorum                      │           │
│  │  5. Apply to KV store                    │           │
│  │  6. Update indexes (async)               │           │
│  │  7. Respond to gateway                   │           │
│  └──────────────────────────────────────────┘           │
│                                                          │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  │
│  │  Follower 1  │  │  Follower 2  │  │  Follower 3  │  │
│  │              │  │              │  │              │  │
│  │ - Replicate  │  │ - Replicate  │  │ - Replicate  │  │
│  │ - Apply      │  │ - Apply      │  │ - Apply      │  │
│  │ - Ready for  │  │ - Ready for  │  │ - Ready for  │  │
│  │   failover   │  │   failover   │  │   failover   │  │
│  └──────────────┘  └──────────────┘  └──────────────┘  │
└─────────────────────────────────────────────────────────┘

Component Breakdown

7.1 Gateway Layer (Stateless)

Responsibilities:

  • Accept HTTP/gRPC requests
  • Authenticate agents (verify Ed25519 signatures)
  • Route requests to correct shard
  • Cache shard topology
  • Return responses to clients

Implementation:

use axum::{Router, routing::post};
use blake3::Hash;

struct Gateway {
    shard_map: Arc<RwLock<ShardMap>>,
    http_client: reqwest::Client,
}

async fn handle_assertion(
    State(gateway): State<Gateway>,
    Json(assertion): Json<Assertion>,
) -> Result<StatusCode, ApiError> {
    // 1. Validate signature
    assertion.verify_signature()?;

    // 2. Compute shard
    let shard_id = compute_shard(&assertion.subject, gateway.shard_map.read().await.num_shards);

    // 3. Find leader
    let leader_addr = gateway.shard_map.read().await.get_leader(shard_id)?;

    // 4. Forward to leader
    gateway.http_client
        .post(format!("{}/internal/append", leader_addr))
        .json(&assertion)
        .send()
        .await?;

    Ok(StatusCode::CREATED)
}

fn compute_shard(subject: &str, num_shards: u32) -> u32 {
    let hash = blake3::hash(subject.as_bytes());
    u32::from_le_bytes(hash.as_bytes()[0..4].try_into().unwrap()) % num_shards
}

Scaling: Run many gateway instances behind load balancer.


7.2 Shard Leader (Stateful)

Responsibilities:

  • Accept writes for this shard's key range
  • Generate HLC timestamps
  • Append to WAL
  • Replicate via Raft
  • Apply committed entries to storage
  • Update indexes
  • Handle leader election

Implementation:

use uhlc::HLC;
use tikv_raft::{RawNode, Config};

struct ShardLeader {
    shard_id: u32,
    hlc: Arc<Mutex<HLC>>,
    wal: WriteAheadLog,
    kv_store: KVStore,
    raft_node: RawNode<MemStorage>,
    index_store: IndexStore,
}

impl ShardLeader {
    async fn append_assertion(&self, assertion: Assertion) -> Result<AssertionId, StorageError> {
        // 1. Generate HLC timestamp
        let timestamp = self.hlc.lock().await.new_timestamp();

        // 2. Create versioned assertion
        let versioned = VersionedAssertion {
            assertion,
            timestamp,
            shard_id: self.shard_id,
        };

        // 3. Serialize
        let bytes = serialize(&versioned)?;

        // 4. Propose to Raft (this does WAL append + replication)
        let propose_id = self.raft_node.propose(vec![], bytes)?;

        // 5. Wait for commit (via channel or polling)
        self.wait_for_commit(propose_id).await?;

        // 6. Apply to KV store (idempotent)
        let assertion_id = AssertionId::from_hash(blake3::hash(&bytes));
        self.kv_store.put(assertion_id.as_bytes(), &bytes)?;

        // 7. Update indexes (async, best-effort)
        tokio::spawn({
            let index_store = self.index_store.clone();
            let versioned = versioned.clone();
            async move {
                let _ = index_store.index_assertion(&versioned).await;
            }
        });

        Ok(assertion_id)
    }
}

7.3 Raft Replication (tikv/raft-rs)

Configuration:

use tikv_raft::{Config, RawNode, storage::MemStorage};

fn create_raft_node(node_id: u64, peers: Vec<u64>) -> RawNode<MemStorage> {
    let config = Config {
        id: node_id,
        election_tick: 10,
        heartbeat_tick: 3,
        max_size_per_msg: 1024 * 1024, // 1MB
        max_inflight_msgs: 256,
        ..Default::default()
    };

    let storage = MemStorage::new();
    RawNode::new(&config, storage, peers).unwrap()
}

Raft Message Handling:

async fn run_raft_loop(raft_node: Arc<Mutex<RawNode<MemStorage>>>) {
    let mut ticker = tokio::time::interval(Duration::from_millis(100));

    loop {
        ticker.tick().await;

        let mut node = raft_node.lock().await;
        node.tick();

        // Process ready entries
        if node.has_ready() {
            let ready = node.ready();

            // 1. Persist entries to WAL
            if !ready.entries().is_empty() {
                persist_entries(ready.entries()).await;
            }

            // 2. Send messages to peers
            for msg in ready.messages {
                send_to_peer(msg).await;
            }

            // 3. Apply committed entries
            for entry in ready.committed_entries {
                apply_entry(entry).await;
            }

            // 4. Advance Raft state
            node.advance(ready);
        }
    }
}

7.4 Storage Layer

Write-Ahead Log:

struct WriteAheadLog {
    file: File,
    current_offset: u64,
}

impl WriteAheadLog {
    async fn append(&mut self, entry: &RaftEntry) -> Result<u64, io::Error> {
        let bytes = serialize(entry)?;
        let checksum = crc32c::crc32c(&bytes);

        // Write: [len: u32][checksum: u32][data: bytes]
        let record = [
            &(bytes.len() as u32).to_le_bytes()[..],
            &checksum.to_le_bytes()[..],
            &bytes[..],
        ].concat();

        self.file.write_all(&record).await?;
        self.file.sync_all().await?; // fsync

        let offset = self.current_offset;
        self.current_offset += record.len() as u64;
        Ok(offset)
    }
}

KV Store (RocksDB):

use rocksdb::{DB, Options, WriteBatch};

struct KVStore {
    db: Arc<DB>,
}

impl KVStore {
    fn put(&self, key: &[u8], value: &[u8]) -> Result<(), rocksdb::Error> {
        self.db.put(key, value)
    }

    fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, rocksdb::Error> {
        self.db.get(key)
    }

    fn batch_put(&self, entries: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), rocksdb::Error> {
        let mut batch = WriteBatch::default();
        for (key, value) in entries {
            batch.put(key, value);
        }
        self.db.write(batch)
    }
}

7.5 Index Store (Async Updates)

Subject Index:

struct SubjectIndex {
    db: Arc<DB>,
}

impl SubjectIndex {
    async fn index_assertion(&self, assertion: &VersionedAssertion) -> Result<(), StorageError> {
        let subject_key = format!("subject:{}:{}", assertion.assertion.subject, assertion.timestamp);
        self.db.put(subject_key.as_bytes(), assertion.assertion.id.as_bytes())?;
        Ok(())
    }

    async fn query_subject(&self, subject: &str, limit: usize) -> Result<Vec<AssertionId>, StorageError> {
        let prefix = format!("subject:{}:", subject);
        let mut results = Vec::new();

        let iter = self.db.prefix_iterator(prefix.as_bytes());
        for (_, value) in iter.take(limit) {
            results.push(AssertionId::from_bytes(&value)?);
        }

        Ok(results)
    }
}

7.6 Failure Handling

Leader Failure:

  1. Followers detect missing heartbeats (election_timeout)
  2. Follower transitions to candidate, increments term
  3. Candidate requests votes from peers
  4. Majority votes → becomes new leader
  5. New leader sends heartbeats
  6. Gateway detects new leader (via Raft status API)

Follower Failure:

  1. Leader detects missing ACKs
  2. Leader continues with remaining quorum
  3. When follower recovers, catches up via log replication

Network Partition:

  1. Minority partition cannot elect leader (no quorum)
  2. Minority partition rejects writes
  3. Majority partition continues serving writes
  4. When partition heals, minority replays majority's log

Consistency Guarantee: Raft ensures linearizability (strongest consistency model).


7.7 What We Skip

No Transaction Coordinator:

  • Single assertion = single Raft proposal
  • No multi-shard transactions

No Intent Resolution:

  • CockroachDB writes "intents" then resolves them
  • We write final values directly

No Read-Write Conflict Detection:

  • Assertions are immutable
  • No conflict possible

No Distributed Locks:

  • No lock table
  • No deadlock detection

No MVCC Garbage Collection:

  • Assertions never deleted
  • No tombstones to clean up

8. Rust Crates to Use

Core Dependencies

[dependencies]
# Consensus
raft = "0.7"                    # TiKV's Raft implementation
raft-proto = "0.7"              # Protobuf definitions

# Hybrid Logical Clock
uhlc = "0.5"                    # Battle-tested HLC

# Storage
rocksdb = "0.21"                # Embedded KV store
sled = "0.34"                   # Alternative: pure-Rust KV store

# Serialization
rkyv = "0.7"                    # Zero-copy serialization (already using)
blake3 = "1.5"                  # Content addressing (already using)

# Async runtime
tokio = { version = "1.35", features = ["full"] }
tokio-stream = "0.1"

# Networking
tonic = "0.10"                  # gRPC (Raft messages)
axum = "0.7"                    # HTTP gateway (already using)
tower = "0.4"                   # Middleware

# Error handling
anyhow = "1.0"
thiserror = "1.0"

# Observability
tracing = "0.1"                 # Already using
tracing-subscriber = "0.3"
metrics = "0.21"

Optional (Future)

# If we need service discovery
etcd-client = "0.12"

# If we want built-in consensus
tikv-client = "0.2"             # Use TiKV as storage backend

# If we need distributed tracing
opentelemetry = "0.21"
opentelemetry-jaeger = "0.20"

9. Implementation Phases

Phase 1: Single-Node with HLC (1-2 weeks)

Goal: Prove HLC integration + baseline performance

// Extend current ingestion with HLC
struct IngestionPipeline {
    hlc: Arc<Mutex<HLC>>,
    wal: WriteAheadLog,
    kv_store: KVStore,
}

impl IngestionPipeline {
    async fn ingest(&self, assertion: Assertion) -> Result<AssertionId, Error> {
        let timestamp = self.hlc.lock().await.new_timestamp();
        let versioned = VersionedAssertion { assertion, timestamp };

        // Existing WAL append logic
        self.wal.append(&versioned).await?;

        // Existing KV store logic
        let id = AssertionId::from_content(&versioned);
        self.kv_store.put(id.as_bytes(), &serialize(&versioned)?)?;

        Ok(id)
    }
}

Tests:

  • HLC monotonicity under concurrent writes
  • HLC causality preservation (ingest A, ingest B superseding A, verify timestamps)
  • Performance baseline (writes/sec, p99 latency)

Phase 2: Single-Shard with Raft Replication (3-4 weeks)

Goal: Fault-tolerant single shard (3 replicas)

Architecture:

Writer → Leader (with Raft) → Followers (2x)

Implementation:

  1. Wrap IngestionPipeline with Raft layer
  2. Leader proposes writes to Raft group
  3. Followers replicate and apply
  4. Leader responds when committed

Tests:

  • Leader failure: kill leader, verify follower elected, writes continue
  • Follower failure: kill follower, verify writes continue with 2/3 quorum
  • Recovery: restart failed node, verify catches up
  • Split-brain: network partition, verify minority rejects writes

Phase 3: Multi-Shard with Gateway (4-6 weeks)

Goal: Horizontal scalability

Architecture:

Writers → Gateway Layer (stateless, N instances)
              ↓
    ┌─────────┼─────────┐
Shard 0    Shard 1    Shard 2
(3 replicas each)

Implementation:

  1. Gateway routes by hash(subject) % num_shards
  2. Shard topology stored in etcd or Raft metadata
  3. Gateway caches topology, refreshes on errors
  4. Dynamic shard addition (consistent hashing)

Tests:

  • Load balancing: verify writes distributed evenly
  • Shard failover: kill entire shard, verify gateway routes to backup
  • Rebalancing: add shard, verify rehashing redistributes data
  • Hotspot handling: send 10K writes to same subject, verify throughput

Phase 4: Production Hardening (ongoing)

Observability:

// Metrics
metrics::counter!("assertions_ingested_total", "shard" => shard_id.to_string());
metrics::histogram!("raft_commit_duration_ms", commit_duration.as_millis() as f64);
metrics::gauge!("raft_leader_id", leader_id as f64);

// Tracing
#[instrument(skip(self), fields(shard_id = %self.shard_id, assertion_id = %assertion.id))]
async fn append_assertion(&self, assertion: Assertion) -> Result<()> {
    info!("Appending assertion");
    // ...
}

Chaos Testing:

  • Random node kills
  • Network latency injection
  • Disk failures
  • Clock skew simulation

Performance Tuning:

  • Batch size optimization (find sweet spot for latency vs throughput)
  • RocksDB compaction tuning
  • Raft message batching
  • Index update batching

10. Key Design Decisions Summary

Decision Choice Rationale
Clock Hybrid Logical Clock (HLC) Causality + physical time without GPS clocks
Consensus Raft (tikv/raft-rs) Simple, proven, excellent Rust implementation
Sharding Hash by subject Data locality for common queries
Replication 3 replicas per shard Tolerate 1 failure (quorum = 2/3)
Storage RocksDB (via rust-rocksdb) LSM tree, proven at scale, good Rust bindings
Serialization rkyv (already using) Zero-copy, fastest option
Transactions None (single-assertion writes) Append-only eliminates need
Gateway Stateless HTTP/gRPC Easy to scale horizontally

11. Performance Expectations

Single Shard (3 Replicas, Same Datacenter)

Metric No Batching With Batching (100/batch)
Write Latency (p50) 5-10ms 5-10ms
Write Latency (p99) 15-30ms 15-30ms
Throughput 5K-20K/sec 100K-500K/sec
Write Amplification 3x (replication) 3x

100 Shards

Metric Value
Aggregate Throughput 10M-50M assertions/sec
Storage Efficiency ~70% (accounting for WAL, indexes)
Fault Tolerance Lose any 1 replica per shard

Resource Requirements (per node)

Resource Requirement
CPU 8-16 cores
RAM 32-64 GB
Disk 1-4 TB NVMe SSD
Network 10 Gbps

12. Comparison: Traditional vs Episteme

CockroachDB Transaction Flow

1. Client: BEGIN TRANSACTION
2. TxnCoordSender: Create transaction record
3. DistSender: Route to ranges, write intents
4. Raft: Replicate intents (2 RTT)
5. TxnCoordSender: Write commit record
6. Raft: Replicate commit (1 RTT)
7. TxnCoordSender: Resolve intents (async)
8. Client: Transaction complete

Total: ~3-4 RTT, complex state machine

Episteme Assertion Flow

1. Client: POST /assertions
2. Gateway: Route to shard leader
3. Leader: Propose to Raft
4. Raft: Replicate and commit (1 RTT)
5. Leader: Apply to storage
6. Gateway: Return assertion ID

Total: ~1-2 RTT, simple state machine

Latency Reduction: 50-75% vs traditional distributed database Code Complexity: ~80% less than CockroachDB transaction coordinator


13. Open Questions & Future Work

Q1: How to handle supersession chains across shards?

Problem: Assertion A1 on shard X supersedes assertion A0 on shard Y. How to ensure ordering?

Option A: Require supersession to happen on same shard (reject if subjects differ) Option B: Use HLC timestamps, accept eventual consistency for cross-shard supersession Option C: Introduce lightweight cross-shard coordination (2PC only for supersession)

Recommendation: Start with Option B (eventual consistency). Most supersession chains will be same subject (same shard).

Q2: How to query across multiple shards efficiently?

Problem: Query "all assertions about subjects in category X" may span shards.

Option A: Scatter-gather (query all shards, merge results) Option B: Secondary index shard (index by predicate or category) Option C: Materialized views pre-aggregated by common queries

Recommendation: Implement Option A first, optimize hot queries with Option C.

Q3: What's the story for multi-region deployments?

Problem: Replicate across US-East, US-West, EU for low-latency reads globally.

Option A: Single Raft group across regions (high write latency) Option B: Regional Raft groups with async replication (eventual consistency) Option C: Spanner-style Paxos with commit-wait (requires TrueTime)

Recommendation: Start single-region, then explore Option B (similar to CockroachDB follower reads).

Q4: How to handle hot shards?

Problem: Celebrity subject (e.g., "Donald Trump") receives 100K writes/sec.

Option A: Over-provision that shard (more CPU/disk) Option B: Sub-shard by predicate (split "Donald Trump" + "nationality" vs "Donald Trump" + "age") Option C: Use consistent hashing to split hot subject across multiple sub-shards

Recommendation: Monitor for hotspots, implement Option A initially, Option C if needed.


Essential Papers

  1. Raft Consensus Algorithm - Diego Ongaro & John Ousterhout
  2. Logical Physical Clocks (HLC) - Kulkarni & Demirbas
  3. Spanner: Google's Globally-Distributed Database - Corbett et al.

Blog Posts

  1. Life of a CockroachDB Transaction
  2. Building a Distributed Log from Scratch

Books

  1. Designing Data-Intensive Applications - Martin Kleppmann (Chapters 5, 7, 9)

Conclusion

For Episteme's append-only, content-addressed knowledge graph, we can build a significantly simpler distributed write path than traditional databases:

  1. Use HLC instead of TrueTime - No specialized hardware needed
  2. Single-writer-per-partition - Eliminates distributed transactions
  3. Raft for replication - Simple, battle-tested consensus
  4. Hash-based sharding by subject - Natural data locality
  5. Async index updates - Decouple write path from read optimization

Expected outcome: A system that scales to millions of concurrent writers, handles petabytes of assertions, and maintains strong consistency guarantees—with ~1/5th the complexity of CockroachDB.

Next steps: Implement Phase 1 (HLC integration) to validate timestamp generation and causality tracking before investing in Raft infrastructure.


Sources

Google Spanner

CockroachDB

Hybrid Logical Clocks

Append-Only Systems

Sharding

Raft & Consensus

Rust Implementations

CRDTs

Kafka & Replicated Logs

Content-Addressed Storage

Leader Election

Clock Skew