tidaldb/docs/specs/14-scale-architecture.md
jordan 413b712c0a chore: initialize tidalDB repository with schema foundation and standards
- Schema phase 1 (tasks 01-02): EntityId, EntityKind, Timestamp, Score, SignalTypeDef, DecayModel, Window, WindowSet — all with property tests and benchmarks scaffolding
- Stub modules for storage, signals, query, ranking
- Full documentation suite: VISION, USE_CASES, SEQUENCE, API, CODING_GUIDELINES, ai-lookup, research docs, specs, roadmap, planning docs
- Marketing site (Next.js) with blog infrastructure
- .claude/ agents and skills for the tidalDB development workflow
- Foundation standards enforced: thiserror + tracing declared as dependencies, clippy::unwrap_used = deny added to lint config
- .gitignore hardened: .next/, node_modules/, .env, secrets, logs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 12:52:20 -07:00

77 KiB

Scale Architecture Specification

Status: Draft Author: tidalDB Engineering Last Updated: 2026-02-20 Depends on: Storage Engine (01), Entity Model (02), Signal System (03), Cohorts (05), Vector Retrieval (07)


Table of Contents

  1. Design Philosophy
  2. Capacity Model
  3. Single-Node Ceiling
  4. Partitioning Strategy
  5. HNSW Index Sharding
  6. Signal Aggregation Distribution
  7. Query Routing and Scatter-Gather
  8. Consistency Model
  9. Replication Strategy
  10. The Single-Node to Distributed Path
  11. Operational Considerations
  12. Prior Art and Lessons Learned

1. Design Philosophy

tidalDB's VISION.md says: "It is not cloud-native first. It is embeddable first. It runs in your process. Distribution is a later problem." The product owner's requirement says: "Millions of users, billions of signal events, thousands of cohorts from day one."

These are not contradictory. They are a sequencing constraint. The architecture must be partitioning-ready in its data model, key encoding, and storage isolation from Phase 1, even though the distributed runtime ships later. The critical insight from production databases is that retrofitting partitioning onto a storage engine that was not designed for it is a multi-year rewrite. CockroachDB, TiDB, and Elasticsearch all built partitioning into their key encoding and storage layer from day one, even when they ran on a single node.

The principle: Build the atoms right. A single tidalDB process is a complete, self-contained shard. Distribution is the coordination of many shards, not a redesign of what a shard is.

What This Spec Covers

This specification answers: at every scale tier, what are the resource requirements, where does the current architecture hit its limits, and what changes are needed to push past those limits? It defines the partitioning strategy, HNSW sharding approach, signal distribution model, query routing, consistency guarantees, replication, and the phased path from a single node to a distributed cluster.

What This Spec Does Not Cover

  • Geo-distribution (multi-region replication, latency-aware placement). That is a later concern.
  • Multi-tenancy isolation (separate customers sharing a cluster). tidalDB is embedded, one tenant per instance.
  • Wire protocol for inter-node communication. That is specified when distribution ships.

2. Capacity Model

2.1 Dimensions of Scale

tidalDB's resource consumption is driven by five independent dimensions. The product must handle growth along any combination.

Dimension Symbol Description
Items (content entities) I Videos, posts, articles, images in the catalog
Users (consumer entities) U Active user profiles with preferences and history
Signals per day S/day Engagement events: views, likes, skips, shares, etc.
Cohorts (named, exact-tracked) C Pre-defined population segments with dedicated counters
Signal types T Distinct signal kinds in schema (~40, effectively fixed)

2.2 Fixed Constants

From the Signal System spec (03) and Entity Model spec (02):

Constant Value Source
Signal types per entity ~6 active (of ~40 defined) Signal System spec, Section 11
Windows per signal 5 (1h, 24h, 7d, 30d, all_time) Signal System spec, Section 2
Decay rates per signal 3 (stored in HotSignalState) Signal System spec, Section 3
Hot-tier bytes per signal state 64 bytes (one cache line) Storage Engine spec, Section 6.2
Warm-tier bytes per active signal ~1.8 KB Signal System spec, Section 3
Embedding dimensions 1536 (primary content embedding) Vector Retrieval spec, Section 2
Bytes per vector (f16 quantized) 1536 * 2 = 3,072 bytes Vector Retrieval spec, Section 4
HNSW graph overhead per vector (M=16) ~128 bytes (M * 2 layers * 4 bytes) USearch benchmarks, M=16
Bytes per entity metadata (avg) ~512 bytes Entity Model spec, estimated
UserCohortMemberships per user 22 bytes Signal System spec, Section 7
Cohort counter per item per signal per hour 20 bytes (CohortBucket) Signal System spec, Section 7

2.3 Scale Tiers

The following table models tidalDB at four scale tiers, from a startup deploying the first version to a large content platform.

Metric Tier 1: Seed Tier 2: Growth Tier 3: Scale Tier 4: Hyperscale
Items 1M 10M 100M 1B
Users 100K 1M 10M 100M
Signals/day 10M 100M 1B 10B
Signals/sec (sustained) ~116 ~1,157 ~11,574 ~115,741
Signals/sec (peak, 10x) ~1,160 ~11,570 ~115,740 ~1,157,410
Named cohorts 10 100 500 500
Exact-tracked cohorts 5 30 89 89
Items with cohort tracking 1K 10K 100K 1M

2.4 Per-Tier Resource Estimates

Memory

Component Tier 1 Tier 2 Tier 3 Tier 4
Hot-tier signal state (64B * I * 6 signals) 384 MB 3.8 GB 38.4 GB 384 GB
Warm-tier signal state (5% active * 1.8KB * 6) 540 MB 5.4 GB 54 GB 540 GB
HNSW index (3.2KB * I) 3.2 GB 32 GB 320 GB 3.2 TB
Entity metadata cache 512 MB 5 GB 50 GB 500 GB
User cohort memberships (22B * U) 2.2 MB 22 MB 220 MB 2.2 GB
Roaring bitmaps (cohort resolution) 6.3 MB 63 MB 630 MB 6.3 GB
Tantivy inverted index (est. 20% of text) 200 MB 2 GB 20 GB 200 GB
Total memory ~4.8 GB ~48 GB ~483 GB ~4.8 TB

Critical observation. A single 64 GB node comfortably handles Tier 1 and can stretch to Tier 2 with selective hot-tier eviction (keeping ~2M entities hot instead of 10M). Tier 3 requires either a very large single node (512+ GB RAM) or partitioning. Tier 4 is impossible on a single node. The HNSW index alone at 100M items requires 320 GB.

Disk (Warm + Cold Storage)

Component Tier 1 Tier 2 Tier 3 Tier 4
WAL (7-day rolling) 3.2 GB 32 GB 320 GB 3.2 TB
Raw signal events (7-day, FIFO) 22 GB 224 GB 2.24 TB 22.4 TB
Hourly rollups (30-day) 23 GB 231 GB 2.31 TB 23.1 TB
Daily rollups (indefinite, 1yr) 117 MB 1.17 GB 11.7 GB 117 GB
Cohort dimensional rollups (7-day) 3.2 GB 31.6 GB 316 GB 3.16 TB
Entity metadata (redb) 512 MB 5 GB 50 GB 500 GB
HNSW index files (mmap) 3.2 GB 32 GB 320 GB 3.2 TB
Tantivy index files 200 MB 2 GB 20 GB 200 GB
Total disk ~56 GB ~558 GB ~5.6 TB ~56 TB

Disk I/O (Sustained Write Throughput)

Component Tier 1 Tier 2 Tier 3 Tier 4
WAL writes 0.5 MB/s 5 MB/s 50 MB/s 500 MB/s
EVT SST flushes (2x WA) 1 MB/s 10 MB/s 100 MB/s 1 GB/s
SIG leveled compaction (~10x WA) 0.2 MB/s 2 MB/s 20 MB/s 200 MB/s
MV rollup writes (COW, 2x WA) 0.06 MB/s 0.6 MB/s 6 MB/s 60 MB/s
Total sustained disk write ~1.8 MB/s ~18 MB/s ~176 MB/s ~1.76 GB/s

A modern NVMe SSD sustains 1-3 GB/s sequential writes. Tier 3 is within a single NVMe's write bandwidth. Tier 4 saturates a single NVMe and requires either RAID-0 striping or partitioning across multiple nodes.


3. Single-Node Ceiling

3.1 The Reference Node

For ceiling analysis, we define the reference node:

Resource Specification
CPU 16 cores, 3.5 GHz (AMD EPYC 7003 or Intel Xeon 4th gen)
RAM 64 GB DDR5
Storage 2 TB NVMe SSD (3.5 GB/s seq read, 3.0 GB/s seq write)
Network 25 Gbps (irrelevant for single-node, relevant for replication)

3.2 What Breaks First

The answer depends on the workload mix. We analyze each resource independently.

Memory: HNSW Is the Bottleneck

Item Count HNSW Memory (f16, 1536d, M=16) Hot-Tier Signal State (6 signals) Total
1M 3.2 GB 384 MB ~4 GB
5M 16 GB 1.9 GB ~20 GB
10M 32 GB 3.8 GB ~40 GB
15M 48 GB 5.8 GB ~58 GB
~16M ~51 GB ~6.1 GB ~64 GB
50M 160 GB 19.2 GB ~190 GB
100M 320 GB 38.4 GB ~380 GB

On a 64 GB node, the HNSW index caps item count at ~16M with f16 quantization. Beyond that, either: (a) use scalar quantization (uint8, 4x compression, recall drops ~3-5%), pushing the ceiling to ~60M items; (b) use DiskANN/mmap-based index with SSD backing; or (c) shard the HNSW index across nodes.

The hot-tier signal state is manageable up to ~16M items (6.1 GB). Beyond that, the eviction policy (Section 6.3 of Storage Engine spec) keeps only 2M entities hot (128 MB) and loads the rest on demand from SSD (~50 us per miss).

Memory is the first bottleneck. The HNSW index drives it.

CPU: Signal Writes vs Ranking Queries

Operation Per-Operation Cost Throughput on 16 Cores
Signal write (full path: dedup + WAL + hot + warm + pref + rel) ~1-5 us 3.2M-16M writes/sec
Ranking query (200 candidates: ANN + signals + scoring + diversity) ~10-50 ms 320-1,600 queries/sec
Background materializer (minute rotation, 500K active entities) ~100 ms every 60s ~0.1% CPU

At Tier 3 (11.6K signals/sec sustained, 115K peak), signal writes consume approximately 0.07-0.35 cores sustained, 0.7-3.5 cores at peak. At 1K ranking queries/sec with 50ms each, ranking consumes approximately 50 cores of work per second, which on 16 cores requires careful scheduling but is achievable if queries are parallelized across cores (each query is sequential, but many queries run concurrently).

CPU is not the first bottleneck for Tier 2. At Tier 3, concurrent ranking queries under peak signal load may compete for cores, requiring query-priority scheduling.

Disk I/O: WAL Writes Are Cheap, Compaction Is the Risk

From Section 2.4, Tier 3 sustained disk write is ~176 MB/s. A single NVMe at 3 GB/s has 17x headroom. The risk is not bandwidth but IOPS during leveled compaction of the SIG keyspace, where random read-merge-write patterns can spike to thousands of IOPS. Modern NVMe drives sustain 500K+ random IOPS, so this is manageable.

Disk I/O is not the first bottleneck through Tier 3.

Disk Capacity

Tier 3 requires ~5.6 TB. A 2 TB NVMe is insufficient. Options: (a) mount a 4-8 TB NVMe (available); (b) use tiered storage with S3/object storage for cold rollups; (c) partition across nodes.

3.3 Single-Node Ceiling Summary

Single-Node Ceiling Analysis (64 GB RAM, 16 cores, 2 TB NVMe)

+-------------------------------------------------------------------+
|                    WHAT BREAKS FIRST: MEMORY                       |
|                                                                   |
|  HNSW Index (f16, 1536d, M=16)                                   |
|  +---------+---------+---------+---------+---------+              |
|  |   1M    |   5M    |   10M   |   16M   |   50M   |             |
|  |  3.2 GB | 16 GB   | 32 GB   | 51 GB   | 160 GB  |             |
|  +---------+---------+---------+---------+---------+              |
|        ^                         ^    ^                            |
|        |                         |    |                            |
|     Tier 1                  Tier 2  CEILING                       |
|     Comfortable             Tight   (64 GB node)                  |
|                                                                   |
|  Hot-tier signal state stays manageable through 16M items.        |
|  Warm-tier is sparse (5% active) -- no issue through Tier 2.     |
|                                                                   |
|  CPU: Comfortable through Tier 2. Tier 3 needs query scheduling. |
|  Disk I/O: Comfortable through Tier 3.                            |
|  Disk capacity: 2 TB covers Tier 2. Tier 3 needs 4-8 TB.         |
+-------------------------------------------------------------------+

Single-node practical ceiling:
  Items:       ~16M (with f16), ~60M (with uint8 quantization)
  Users:       ~5M (bounded by bitmap + membership cache)
  Signals/day: ~500M (bounded by CPU at peak)
  Cohorts:     ~89 exact-tracked (signal system limit, not node limit)

3.4 Large Single-Node Ceiling (512 GB RAM, 64 cores, 8 TB NVMe)

For completeness, a large dedicated machine extends the ceiling significantly:

Dimension 64 GB Node 512 GB Node
Items (HNSW, f16) ~16M ~130M
Items (HNSW, uint8) ~60M ~500M
Users ~5M ~40M
Signals/day ~500M ~4B
Storage (disk) 2 TB 8 TB

A 512 GB node can handle most of Tier 3. Tier 4 (1B items, 100M users, 10B signals/day) is unreachable on any single node. This is where distribution becomes necessary.


4. Partitioning Strategy

4.1 The Three Candidates

We evaluate three partitioning strategies against tidalDB's workload: the ranking query.

The ranking query touches:

  1. Vector index (ANN retrieval of ~500 candidates)
  2. Signal state (decay scores, velocity, windowed counts for ~200 scored candidates)
  3. Entity metadata (filtering: format, duration, category, etc.)
  4. Relationship state (user-item: unseen, blocked; user-creator: followed, interaction weight)
  5. Cohort counters (if FOR COHORT query: dimensional rollups)

The partitioning strategy must minimize the number of partitions touched per query while keeping data distribution even.

4.2 Option A: Hash Partitioning by Entity ID

Mechanism: shard = hash(entity_id) % num_shards. All data for a given entity (metadata, signals, relationships, aggregates) co-locates on one shard.

Hash Partitioning by Entity ID

     entity_id = 42             entity_id = 77
         |                           |
    hash(42) % 4 = 2           hash(77) % 4 = 1
         |                           |
         v                           v
   +----------+  +----------+  +----------+  +----------+
   | Shard 0  |  | Shard 1  |  | Shard 2  |  | Shard 3  |
   |          |  | ent 77   |  | ent 42   |  |          |
   | signals  |  | signals  |  | signals  |  | signals  |
   | metadata |  | metadata |  | metadata |  | metadata |
   | vectors  |  | vectors  |  | vectors  |  | vectors  |
   +----------+  +----------+  +----------+  +----------+

Strengths:

  • Even distribution (hash functions spread uniformly)
  • Single-shard reads for entity-scoped queries (signal snapshot, entity metadata)
  • Simple routing: one hash computation per key
  • No hot-shard risk from popular categories

Weaknesses:

  • ANN queries require scatter-gather across ALL shards. The HNSW graph is split; each shard has a partial graph. The top-K from each shard must be merged. At 16 shards with ef_search=200, this means 16 parallel HNSW traversals and a K-way merge.
  • Trending/velocity queries require scatter-gather. "What is trending globally?" must scan velocity data across all shards and merge.
  • Cohort trending is scatter-gather. Every shard has some items matching a cohort velocity query.
  • Category-scoped queries are scatter-gather. No co-location by category.

Used by: Redis Cluster (hash slots), Cassandra (consistent hashing), DynamoDB (hash key).

4.3 Option B: Range Partitioning by Category/Topic

Mechanism: Items are assigned to shards based on their primary category. shard = category_to_shard[item.category]. Categories are mapped to shard ranges, with hot categories split across multiple shards.

Range Partitioning by Category

   +-----------+  +-----------+  +-----------+  +-----------+
   | Shard 0   |  | Shard 1   |  | Shard 2   |  | Shard 3   |
   | music     |  | gaming    |  | cooking   |  | sports    |
   | dance     |  | tech      |  | fashion   |  | fitness   |
   | podcasts  |  | science   |  | beauty    |  | outdoors  |
   |           |  |           |  |           |  |           |
   | All music |  | All gaming|  | All cook  |  | All sport |
   | items +   |  | items +   |  | items +   |  | items +   |
   | signals   |  | signals   |  | signals   |  | signals   |
   +-----------+  +-----------+  +-----------+  +-----------+

Strengths:

  • Category-scoped queries hit a single shard
  • "Trending in music" is a local computation
  • Cohort trending within a category is co-located

Weaknesses:

  • Hot categories create massive skew. Gaming and music may have 100x the items and signals of niche categories. Rebalancing requires splitting hot categories across shards, which is complex.
  • Global trending still requires scatter-gather across all shards.
  • Cross-category queries (the common case for personalized feeds) are scatter-gather. The "For You" feed pulls from all categories based on user preference. This is the dominant query pattern.
  • HNSW index is still per-shard. ANN search for personalized retrieval (user preference vector vs all items) still fans out.
  • Items can belong to multiple categories, complicating placement.

Used by: Apache Druid (time-based partitioning), some Elasticsearch deployments (index-per-category), YouTube internal systems (reported category-based sharding).

4.4 Option C: Entity-Sharded with Replicated Global State

Mechanism: A hybrid approach:

  • Entity data (metadata, signals, embeddings) is hash-partitioned by entity_id across data shards. Each shard is a complete tidalDB instance for its entity subset.
  • HNSW index is replicated to all query nodes (or a subset of dedicated query nodes). At f16 quantization, 100M items require 320 GB, which fits in a large query node's memory or can be split into a small number of HNSW partitions.
  • Global and cohort trending aggregates are materialized on dedicated aggregation nodes that receive streaming updates from all data shards.
Option C: Hybrid Architecture

                    +-------------------+
                    |  Query Router     |
                    |  (stateless)      |
                    +--------+----------+
                             |
              +--------------+--------------+
              |              |              |
    +---------v--+  +--------v---+  +-------v----+
    | Query Node |  | Query Node |  | Query Node |
    | (read-only)|  | (read-only)|  | (read-only)|
    |            |  |            |  |            |
    | HNSW rep.  |  | HNSW rep.  |  | HNSW rep.  |
    | Full/Part  |  | Full/Part  |  | Full/Part  |
    | Tantivy    |  | Tantivy    |  | Tantivy    |
    | rep.       |  | rep.       |  | rep.       |
    +------+-----+  +------+-----+  +------+-----+
           |               |               |
           +-------+-------+-------+-------+
                   |               |
         +---------v---+  +--------v----+
         | Aggregation |  | Aggregation |
         | Node        |  | Node        |
         | Global vel. |  | Cohort vel. |
         | Top-K mats  |  | Trending    |
         +------+------+  +------+------+
                |                 |
    +-----------+-----------+-----------+
    |           |           |           |
+---v----+ +---v----+ +---v----+ +---v----+
| Data   | | Data   | | Data   | | Data   |
| Shard 0| | Shard 1| | Shard 2| | Shard 3|
|        | |        | |        | |        |
| Entity | | Entity | | Entity | | Entity |
| signals| | signals| | signals| | signals|
| WAL    | | WAL    | | WAL    | | WAL    |
+--------+ +--------+ +--------+ +--------+

Strengths:

  • ANN queries are local. Each query node has a full (or large-partition) HNSW replica. No scatter-gather for vector search.
  • Trending queries are local to aggregation nodes. Pre-materialized global and cohort velocity data is served without fan-out.
  • Signal writes are single-shard. Each signal event routes to the shard owning the target item's entity_id.
  • Personalized ranking queries touch one query node (for ANN + metadata filtering) plus one aggregation node (for velocity signals). Two hops, not N-shard scatter-gather.
  • Read-write separation. Data shards handle writes, query nodes handle reads. Independent scaling.

Weaknesses:

  • HNSW replication cost. Replicating a 320 GB index (100M items) to multiple query nodes requires memory-rich machines and a replication pipeline for index updates.
  • Aggregation lag. Global velocity on the aggregation node is eventually consistent with the data shards. Lag bounded by the streaming pipeline latency (target: <5 seconds).
  • Operational complexity. Three node roles (data shard, query node, aggregation node) vs one in the single-node design.
  • Entity metadata must be accessible to query nodes for filtering. Either replicate metadata or fetch on demand from data shards per query.

Used by (components): Vespa (content nodes + container nodes, HNSW per content group), Elasticsearch (data nodes + coordinating nodes), Pinecone (storage nodes + query nodes with index replicas), Milvus (data nodes + query nodes + index nodes).

4.5 Recommendation: Option C (Entity-Sharded with Replicated Global State)

Option C is the recommended partitioning strategy. The evidence:

  1. The dominant query pattern is the personalized ranking query. This query combines ANN retrieval (global), signal scoring (per-entity), and metadata filtering (per-entity). Option A forces every ranking query into an all-shard scatter-gather for ANN. Option B forces every "For You" query into a cross-category scatter-gather. Option C makes the common case fast: ANN on a local replica, signal data from a targeted shard or pre-materialized aggregate.

  2. Production vector databases converge on this pattern. Vespa, Pinecone, and Milvus all separate index-serving nodes from data-storage nodes. Vespa's content groups hold full replicas of the document set per group, with HNSW per group. Pinecone's pod architecture separates storage from query processing. Milvus explicitly separates data nodes, index nodes, and query nodes.

  3. Trending is a materialized view, not a live scan. The Signal System spec (Section 9) already designs trending velocity as a background-materialized aggregate. The aggregation node in Option C is the natural home for this materialized state in a distributed deployment. It receives a stream of signal events from all data shards and maintains the same materialized aggregates that the background materializer maintains in the single-node case.

  4. The key encoding already supports this. The entity-id-prefix encoding from Storage Engine spec Section 5 means every data shard is a self-contained tidalDB instance for its entity range. No code changes are needed in the storage layer -- only a routing layer is added above it.

  5. Read-write separation matches the workload. tidalDB's workload is read-dominated for ranking queries and write-heavy for signal ingestion. Option C allows scaling reads (add query nodes) independently of writes (add data shards). This matches the load profile exactly.

The key trade-off is HNSW replication cost. This is addressed in Section 5 (HNSW Index Sharding).

4.6 Partitioning Strategy Comparison Matrix

Criterion Option A: Hash by Entity Option B: Range by Category Option C: Hybrid (Recommended)
ANN query routing ALL shards (scatter-gather) ALL shards (scatter-gather) 1 query node (local replica)
Entity signal read 1 shard (local) 1 shard (local) 1 data shard (targeted)
Global trending ALL shards (scatter-gather) ALL shards (scatter-gather) 1 aggregation node (local)
Cohort trending ALL shards (scatter-gather) 1 shard (if category-scoped) 1 aggregation node (local)
"For You" feed ALL shards (ANN + signals) ALL shards (cross-category) 1 query + 1 aggregation node
Signal write routing 1 shard (hash of item_id) 1 shard (item's category) 1 data shard (hash of item_id)
Distribution evenness Excellent (hash) Poor (category skew) Excellent (hash on data shards)
HNSW memory per query node Partial index (1/N of items) Partial index (1/N of items) Full or large-partition replica
Operational complexity Low (uniform nodes) Medium (hot-shard management) High (3 node roles)
Signal write amplification 1x (single shard) 1x (single shard) 1x (single shard) + streaming to aggregation
Scatter-gather queries All non-entity-scoped queries All non-category-scoped queries Only entity-scoped queries to specific shards

5. HNSW Index Sharding

5.1 The Problem

HNSW does not partition naturally. The graph's power comes from long-range connections at higher layers that span the entire vector space. Splitting the graph into disjoint partitions severs these connections, degrading recall. Any approach to distributing HNSW must account for this.

5.2 Approaches Surveyed

Approach 1: Full Replica Per Query Node

Every query node holds a complete copy of the HNSW index.

Memory per query node:

Item Count f16 Memory uint8 Memory
10M 32 GB 16 GB
50M 160 GB 80 GB
100M 320 GB 160 GB
500M 1.6 TB 800 GB

Strengths: No recall loss (full graph connectivity). Single query node handles ANN without network hops. Simplest query path.

Weaknesses: Memory-prohibitive at 100M+ items with f16. Replication of index updates (new vectors, deleted vectors) must propagate to all query nodes.

Production usage: Vespa uses this approach within content groups (each group holds a full replica). Works well up to ~50M vectors with quantization on a 512 GB machine.

Applicable range: Up to ~50M items (f16) or ~200M items (uint8) on 512 GB query nodes.

Approach 2: IVF-Partitioned HNSW

The vector space is divided into clusters using k-means. Each partition gets its own HNSW graph. At query time, the query is routed to the nearest n_probe partitions, and the top-K from each are merged.

IVF-Partitioned HNSW

Step 1: Cluster all vectors into K partitions (k-means)
Step 2: Build independent HNSW per partition
Step 3: At query time:
   a. Compare query to K cluster centroids
   b. Select top n_probe nearest clusters
   c. Search HNSW in each selected partition
   d. Merge top-K results from n_probe partitions

         Query Vector
              |
    Compare to centroids
              |
    +----+----+----+----+
    | C0 | C1 | C2 | C3 |   (K=4 partitions)
    +----+----+----+----+
              |
    Select top 2 (n_probe=2)
              |
     +--------+--------+
     |                  |
  +--v---+          +---v--+
  | HNSW |          | HNSW |
  | Part1|          | Part2|
  | top-K|          | top-K|
  +--+---+          +---+--+
     |                  |
     +--------+---------+
              |
         Merge top-K

Strengths: Each partition requires 1/K of the memory of a full replica. At K=8 and 100M items, each partition HNSW is ~40 GB (f16), manageable on a 64 GB machine. Centroids are tiny (K * 1536 * 4 = 24 KB for K=4096).

Weaknesses: Recall degrades at partition boundaries. Vectors near the boundary of two clusters may be in a "wrong" partition relative to a given query. Increasing n_probe recovers recall at the cost of more parallel searches. Research shows: at K=32 and n_probe=4, recall@100 drops ~3-5% vs full HNSW. At n_probe=8, recall recovers to within 1%. Standard technique from FAISS (IVF_HNSW) and DiskANN (overlapping partitions).

Production usage: Milvus uses IVF-based partitioning. FAISS IVF_HNSW is the standard large-scale approach. Alibaba's ADBV uses IVF-partitioned Vamana for 2B vectors across 32 shards.

Applicable range: 50M to 1B items. The sweet spot for tidalDB's distributed phase.

Approach 3: DiskANN/SSD-Backed Index

Use DiskANN (Vamana graph on SSD) instead of in-memory HNSW. The graph structure resides on NVMe SSD with only the compressed vectors and navigation metadata in memory. Query-time latency increases from ~1-5ms to ~5-15ms due to SSD reads, but memory consumption drops dramatically.

Memory per node:

Item Count In-Memory (nav data + compressed vectors) SSD (full graph)
100M ~25 GB (PQ-compressed) 320 GB
1B ~250 GB (PQ-compressed) 3.2 TB

Strengths: 40x cheaper than in-memory HNSW (per the DiskANN blog post by Wilson Lin). A single large-NVMe node can serve 1B vectors. Fits tidalDB's "vertical first" philosophy.

Weaknesses: Latency increases to 5-15ms for ANN (vs 1-5ms in-memory). For tidalDB's 50ms end-to-end budget with ANN as one phase, this is acceptable but leaves less headroom for scoring. Not a Rust-native library (DiskANN is C++, requires FFI like USearch).

Production usage: Microsoft's Bing search uses DiskANN. Wilson Lin demonstrated 96 GB RAM for 1B vectors (vs 3 TB for HNSW). VLDB 2025 papers show SSD-backed approaches achieving 2-3ms latency at billion scale.

Applicable range: 50M to 1B+ items on a single node. The "delay distribution" option.

5.3 Recommendation: Tiered Strategy

Scale Tier Items HNSW Strategy Memory Per Query Node
Tier 1 (Seed) 1M Full in-memory HNSW (f16) 3.2 GB
Tier 2 (Growth) 10M Full in-memory HNSW (f16) 32 GB
Tier 2+ 10-50M Full in-memory HNSW (uint8 or f16) 32-160 GB
Tier 3 (Scale) 50-100M IVF-partitioned HNSW or DiskANN 40-80 GB per partition or 25 GB with DiskANN
Tier 4 (Hyperscale) 100M-1B IVF-partitioned HNSW across query nodes 40-80 GB per query node

Phase 1-2 (current target): Full in-memory HNSW with f16 quantization. No sharding needed. The VectorIndex trait from Vector Retrieval spec Section 11 abstracts the underlying implementation.

Phase 3 (first distribution need for HNSW): Evaluate DiskANN (delays distribution, extends single-node ceiling to ~500M items at 5-15ms latency) vs IVF-partitioned HNSW (distributes to query nodes, maintains 1-5ms latency). The choice depends on whether the latency budget can absorb SSD access time.

Phase 4: IVF-partitioned HNSW across dedicated query nodes. Each query node holds K/N partitions (where K is total partitions and N is query nodes). Queries route to the nodes holding the nearest centroids.

The trait abstraction in Vector Retrieval spec Section 11 must support this. The VectorIndex::search() method returns Vec<(EntityId, f32)>. Whether that search hits an in-memory HNSW, a DiskANN graph, or an IVF-routed multi-node search is invisible to the caller.


6. Signal Aggregation Distribution

6.1 The Fan-Out Problem

A signal event (user U views item I) must update:

  1. Global counter for item I (1 increment)
  2. Level 1 dimensional counters (region, language, age_group) for item I (3 increments, if cohort-tracked)
  3. Level 2 segment counters for each of user U's segment memberships (~5-10 increments, if cohort-tracked)

From Signal System spec Section 7, average write amplification is 1.13x (because 99% of items are below the cohort activation threshold).

6.2 Distribution of Signal Aggregation

In the distributed architecture (Option C), signal writes flow as follows:

Signal Write Distribution

  Application: db.signal("view", item: "X", user: "U")
       |
       v
  +------------------+
  |  Signal Router   |  Stateless. Routes by hash(item_id).
  +--------+---------+
           |
           v
  +--------+---------+
  | Data Shard       |  Owns item X's entity data.
  | (item X's shard) |
  |                  |
  | 1. Dedup check   |
  | 2. WAL append    |
  | 3. Hot-tier      |  <-- local to this shard
  |    update        |
  | 4. Warm-tier     |
  |    update        |
  | 5. Stream event  |
  |    to aggregation|
  +--------+---------+
           |
           | Streaming (WAL tailing or change feed)
           v
  +--------+---------+
  | Aggregation Node |  Maintains global and cohort velocity.
  |                  |
  | 1. Increment     |
  |    global counter|
  | 2. Increment     |
  |    Level 1 dims  |
  | 3. Increment     |
  |    Level 2 segs  |
  | 4. Update        |
  |    trending mats |
  +------------------+

Key design decision: Per-entity signal state (decay scores, windowed counts) lives on the data shard that owns the entity. Global and cohort-scoped aggregates (velocity, trending materialized views) live on the aggregation node. This split matches the access pattern:

  • Ranking queries that score individual candidates read per-entity signal state from the data shard (or from a cached snapshot on the query node).
  • Trending queries that rank by velocity across all items read from the aggregation node's pre-materialized top-K lists.

6.3 Cohort Aggregation at Scale

The critical question from the prompt: at 10K cohorts with exact tracking, a signal event would require 10K atomic increments. Is this feasible?

Answer: No. And the architecture already prevents it.

The Signal System spec (Section 7) and Cohort spec (Section 15) impose hard limits:

Constraint Value Effect
Max Level 2 exact-tracked segments 89 (100 minus 11 base behavioral) Write amplification capped at ~14x for cohort-tracked items
Cohort activation threshold 100 events/hour per item Only ~100K items (1% at Tier 2) have cohort tracking active
Blended write amplification 1.13x 99% of events increment only the global counter

The 10K cohort scenario is handled by the Level 3 estimation approach. The Cohort spec Section 13 specifies: composite cohorts (intersections of Level 1 and Level 2 dimensions) are estimated at query time, not pre-computed at write time. Only 89 cohorts get exact tracking. The remaining 411 (of 500 max named cohorts) use the independence-assumption estimator.

Tiered cohort strategy for distribution:

Cohort Tier Count Tracking Where Computed
Level 0: Global 1 Exact, always Data shard (local) + aggregation node
Level 1: Primary dimensions ~56 Exact, for cohort-tracked items Data shard (local) + aggregation node
Level 2: Behavioral segments + exact cohorts Up to 89 Exact, for cohort-tracked items Data shard (local) + aggregation node
Level 3: Composite / estimated Up to 411 Estimated at query time Aggregation node (from Level 1 + Level 2 data)
Ad-hoc: Inline predicates Unlimited Estimated at query time Query node (bitmap intersection + aggregation node data)

6.4 Aggregation Node Architecture

The aggregation node receives a stream of signal events from all data shards and maintains:

  1. Global velocity per item per signal (the same data the background materializer computes in the single-node case)
  2. Level 1 and Level 2 cohort-scoped velocity per item per signal
  3. Pre-materialized trending top-K lists (global, per-region, per-segment)
  4. Cohort activation threshold monitoring (which items cross 100 events/hour)

The aggregation node is stateless in the sense that its state is derived from the data shard WALs. If it crashes, it rebuilds by replaying WAL tails from all data shards (from the last checkpoint).

Scaling aggregation: At Tier 4 (115K signals/sec), a single aggregation node processes ~115K events/sec with ~14x average fan-out for the 1% of events hitting cohort-tracked items, yielding ~130K counter increments/sec. An atomic increment takes ~20ns, so the aggregation workload is ~2.6ms of CPU per second. This is trivially handled by a single aggregation node. At extreme scale, the aggregation workload can be partitioned by item_id range across multiple aggregation nodes.


7. Query Routing and Scatter-Gather

7.1 Query Types and Routing

Query Routing Flowchart

  Incoming Query
       |
       v
  +------------------+
  | Query Router     |
  | (stateless)      |
  +--------+---------+
           |
     +-----+-----+-----+-----+
     |           |           |
     v           v           v
  Ranking     Trending    Entity
  Query       Query       Lookup
     |           |           |
     v           v           v
  Query       Aggregation  Data
  Node        Node         Shard
Query Type Example Routing Nodes Touched
Personalized feed RETRIEVE items FOR USER @u USING PROFILE for_you Query node (ANN + metadata filter + scoring) 1 query node + signal data from shard(s) for top-200 candidates
Global trending RETRIEVE items USING PROFILE trending WINDOW 24h Aggregation node (pre-materialized top-K) 1 aggregation node
Cohort trending RETRIEVE items USING PROFILE trending FOR COHORT young_us_jazz Aggregation node (cohort-scoped top-K) 1 aggregation node
Search SEARCH items QUERY "piano" USING PROFILE search Query node (Tantivy + optional ANN + scoring) 1 query node
Search within trending SEARCH items QUERY "piano" WITHIN TRENDING FOR COHORT young_us_jazz Aggregation node (candidate set) then query node (text search within candidates) 1 aggregation + 1 query node
Entity signal snapshot GET item:@id SIGNALS Data shard owning item_id 1 data shard
Related items RETRIEVE items RELATED TO item:@id Query node (ANN with item's embedding as query) 1 query node

7.2 Ranking Query Execution (Distributed)

The ranking query is the most complex. Here is the distributed execution plan for RETRIEVE items FOR USER @u USING PROFILE for_you LIMIT 50:

Distributed Ranking Query Execution

  Phase 1: User Context Load                          ~2ms
  +-----------------------------------------------+
  | Load user @u's preference vector               |
  | Load user @u's relationship state (follows,    |
  |   blocks, seen set)                            |
  | Load user @u's cohort memberships              |
  | Source: user's data shard (or cached on query  |
  |   node from recent queries)                    |
  +-----------------------------------------------+
                      |
  Phase 2: ANN Candidate Retrieval                    ~5ms
  +-----------------------------------------------+
  | Query HNSW with user preference vector         |
  | Filter: unseen, unblocked (predicate callback) |
  | Return top-500 candidate item_ids              |
  | Source: local HNSW replica on query node       |
  +-----------------------------------------------+
                      |
  Phase 3: Signal Enrichment                          ~10ms
  +-----------------------------------------------+
  | For each of 200 candidates (after coarse       |
  | metadata filtering):                           |
  |   Read decay scores (hot-tier)                 |
  |   Read velocity (warm-tier / aggregation node) |
  |   Read user-item relationship weight           |
  |                                                |
  | Two sources:                                   |
  | a. Signal snapshot cache on query node (if     |
  |    recently refreshed)                         |
  | b. Targeted reads to data shards owning each   |
  |    candidate (batched by shard, parallel)       |
  +-----------------------------------------------+
                      |
  Phase 4: Scoring                                    ~1ms
  +-----------------------------------------------+
  | Apply ranking profile to 200 candidates        |
  | Combine: ANN distance, decay scores, velocity, |
  |   relationship weight, cohort boost            |
  +-----------------------------------------------+
                      |
  Phase 5: Diversity and Result Assembly              ~1ms
  +-----------------------------------------------+
  | Apply max_per_creator, format_mix              |
  | Select top 50                                  |
  | Assemble response with signal snapshots        |
  +-----------------------------------------------+

  Total: ~19ms (well within 50ms budget)

7.3 Signal Enrichment: The Scatter-Gather Trade-off

Phase 3 (signal enrichment) is the only phase that may require cross-shard reads in Option C. The 200 candidate items are distributed across data shards by hash(item_id). With 4 data shards, each shard holds ~50 of the 200 candidates.

Approach 1: Batched parallel reads to data shards.

  • 4 parallel requests, each reading ~50 entity signal states
  • Per-shard read: 50 entities * ~500 ns per entity (hot-tier or fjall memtable) = ~25 us
  • Network round-trip: ~100-500 us (same-rack)
  • Total: ~500 us + 25 us = ~525 us. Acceptable.

Approach 2: Signal snapshot cache on query nodes.

  • Query nodes maintain a recently-accessed cache of entity signal states
  • Cache populated by: (a) piggybacking on replication stream, (b) LRU cache filled by previous queries
  • Hot entities (trending, frequently queried) are cached. Cold entities require a data shard read.
  • Expected cache hit rate for personalized feeds: 60-80% (popular items repeat across users)
  • Cache miss penalty: same as Approach 1

Recommendation: Start with Approach 1 (batched parallel reads). Add Approach 2 (signal cache) when benchmarks show signal enrichment exceeds the 10ms budget. The trait abstraction allows this evolution without changing the query executor.

7.4 Latency Budget Allocation

Phase Budget Single-Node Distributed
User context load 3ms ~100 us (local) ~500 us (one shard read)
ANN retrieval 10ms ~5ms (local HNSW) ~5ms (local HNSW replica)
Metadata filtering 5ms ~2ms (local) ~2ms (local replica or shard reads)
Signal enrichment 15ms ~5us (local hot-tier) ~1-5ms (batched shard reads)
Scoring 5ms ~1ms (local) ~1ms (local)
Diversity + assembly 2ms ~500us (local) ~500us (local)
Total 50ms ~8ms ~10-14ms
Headroom 42ms 36-40ms

The distributed case has ample headroom within the 50ms budget. The dominant new cost is signal enrichment via cross-shard reads, which is bounded by network round-trip time, not computation.


8. Consistency Model

8.1 Consistency Requirements by Data Type

tidalDB is a ranking database. Ranking is inherently approximate. An engagement signal that arrives 100ms before a query vs 100ms after produces a negligibly different ranking. This relaxed correctness requirement enables a consistency model optimized for availability and latency.

Data Type Required Consistency Rationale
Signal events (WAL) Durable, ordered per entity No signal loss. WAL is the source of truth. Per-entity ordering ensures decay computation correctness. Cross-entity ordering is not required (ranking is approximate).
Entity metadata Read-your-writes After update_item() returns, the next query from the same client must see the update. Stale reads from other clients are acceptable for up to 1 second.
Signal aggregates (hot-tier) Eventual (bounded staleness) Aggregates may lag signal events by up to the group commit delay (10ms) + replication lag (target: <5 seconds in distributed mode). This is acceptable because ranking tolerates staleness.
HNSW index Eventual (bounded staleness) New vectors are visible after the next index refresh (target: <30 seconds). Deleted vectors are filtered at query time via a deletion bitmap (immediate).
Tantivy index Eventual (bounded staleness) New documents visible after next Tantivy commit (target: <5 seconds). Same pattern as HNSW.
Cohort bitmaps Eventual (bounded staleness) Cohort membership reflects user attributes at last refresh. Static cohorts: <1 second (eager bitmap flip). Dynamic cohorts: refresh interval (1-6 hours).
Trending materialized views Eventual (bounded staleness) Trending rankings may lag by up to 5 seconds on the aggregation node. Acceptable for the "what is trending" use case.
Schema (signal defs, profiles) Strong (linearizable) Schema changes are infrequent and must be consistent across all nodes. Applied via a coordination protocol (Raft or simple leader-based).

8.2 Consistency Guarantees for Applications

Guarantee 1: Signal durability. If db.signal() returns Ok(()) with Batched or Immediate durability, the signal event survives any single node failure. The WAL on the data shard is the guarantee.

Guarantee 2: Read-your-writes for entities. After db.write_item() returns, subsequent db.retrieve() from the same session reflects the update. Implemented by routing reads to the same data shard as writes (or by passing a write-version token).

Guarantee 3: Bounded staleness for ranking. All signal aggregates, trending views, and index updates are fresh within a configurable staleness bound (default: 5 seconds). The application can tighten this at the cost of more frequent flushes and higher I/O.

Guarantee 4: No phantom results. A ranking query never returns an entity that has been hard-deleted. Deletion is synchronous on the data shard and propagated to query nodes via the deletion bitmap (immediate invalidation) before index removal (background).

Guarantee 5: Monotonic reads within a session. A user who sees item X in their feed at time T will not see item X disappear at time T+1 due to replication lag (assuming the item was not actually deleted or hidden). This is enforced by serving repeated queries from the same query node within a session.

8.3 Conflict Resolution

In the distributed architecture, the only potential conflict is concurrent writes to the same entity on the same data shard. Since each entity is owned by exactly one data shard, there is no cross-shard conflict. Within a shard, the existing lock-free CAS-based signal update mechanism (Signal System spec, Section 4) handles concurrent writers correctly.

Schema changes (define_signal, define_profile, define_cohort) are serialized through a coordination service (embedded Raft or a lightweight leader-election protocol). This is the only operation that requires distributed consensus.


9. Replication Strategy

9.1 Data Shard Replication

Each data shard is a self-contained tidalDB instance with its own WAL, fjall keyspace, and redb tables. Replication uses WAL shipping:

Data Shard Replication via WAL Shipping

  +------------------+          +------------------+
  | Leader Shard 0   |  sealed  | Follower Shard 0 |
  |                  |  WAL     |     (replica)     |
  | WAL: write ------>  segments |                  |
  | fsync            |--------->| WAL: replay      |
  |                  |          | Apply to stores  |
  | Serves writes    |          | Serves reads     |
  +------------------+          +------------------+

Mechanism:

  1. The leader shard writes to its WAL and serves all writes.
  2. When a WAL segment is sealed (full, or on a timer), it is shipped to follower shards.
  3. Followers replay the sealed segment, applying records to their local stores.
  4. Followers can serve read queries (with bounded staleness equal to the replication lag).

Replication lag target: <5 seconds. This is the lag between a signal event being written on the leader and being visible on a follower. Given the WAL segment size of 64 MiB and sustained write throughput of 5 MB/s (Tier 2), a segment fills in ~13 seconds. To achieve <5 second lag, a timer-based seal (every 5 seconds) triggers segment shipping before the segment is full.

Replication factor: Default: 2 (1 leader + 1 follower). For high availability: 3 (1 leader + 2 followers). Loss of the leader promotes a follower (the one with the highest replayed seqno).

9.2 HNSW Index Replication

The HNSW index is a derived index, rebuilt from entity store embedding columns. Replication options:

Option A: Ship the index file. Periodically (every 30 seconds to 5 minutes), the leader serializes the HNSW index to a file and ships it to query nodes. Index size: 32 GB for 10M items. At 25 Gbps network, transfer takes ~10 seconds. Incremental updates (only changed vectors) can reduce this.

Option B: Replay embedding writes. Query nodes maintain their own HNSW index. They receive a stream of embedding insert/update/delete operations from data shards and apply them locally. This avoids shipping the full index but requires the query node to perform HNSW insertions (which are more expensive than searches).

Recommendation: Option B for Phase 3 (moderate item counts, incremental updates are cheap). Option A as a fallback for periodic full rebuilds (crash recovery, new query node bootstrap).

9.3 Aggregation Node Replication

The aggregation node's state is derived from data shard WALs. It is replicated by having a standby aggregation node that tails the same WAL streams. On failure, the standby takes over with minimal lag.

9.4 Failover

Component Failure Mode Recovery
Data shard leader Process crash or node failure Follower with highest seqno is promoted to leader. In-flight writes that were not yet replicated are re-sent by clients (dedup prevents double-counting). Recovery time: <10 seconds (Raft leader election or manual promotion).
Data shard follower Process crash or node failure Leader continues serving. Follower is replaced and catches up by replaying WAL from last checkpoint. No query impact if other followers exist.
Query node Process crash or node failure Stateless for query processing. Load balancer routes to another query node. HNSW index must be rebuilt or loaded (from snapshot or by replaying embedding stream). Recovery time for HNSW: depends on index size and rebuild method.
Aggregation node Process crash or node failure Standby aggregation node takes over. Rebuilds state by replaying WAL tails from all data shards. Recovery time: proportional to WAL tail length across all shards (target: <30 seconds).

10. The Single-Node to Distributed Path

10.1 Phase Overview

Phase 1          Phase 2            Phase 3               Phase 4
Single Node      Read Replicas      Partitioned Signals    Sharded HNSW
                                    + Aggregation Node     + Multi-Node
+---------+     +---------+        +---------+            +---------+
|         |     | Leader  |        | Leader  |            | Data    |
| tidalDB |     | tidalDB +------->| Data    +--stream--->| Shards  |
| (all in |     |         |        | Shard   |   |        | (N)     |
| one)    |     +---------+        +---------+   |        +---------+
|         |          |                  |         |             |
+---------+     +---------+        +---------+   |        +---------+
                | Follower|        | Follower|   |        | Query   |
                | tidalDB |        | Read    |   +------->| Nodes   |
                | (reads) |        | Replica |   |        | (HNSW)  |
                +---------+        +---------+   |        +---------+
                                                 |             |
                                            +---------+   +---------+
                                            | Aggreg. |   | Aggreg. |
                                            | Node    |   | Nodes   |
                                            +---------+   +---------+

Items:   1-16M         1-16M           1-100M            100M-1B
Users:   100K-5M       100K-5M         1M-40M            10M-100M
Signals: 10M-500M/day  10M-500M/day    100M-4B/day       1B-10B/day

10.2 Phase 1: Single Node (Current Target)

What it is: A single tidalDB process running all subsystems: WAL, hybrid storage (fjall + redb), HNSW, Tantivy, signal system, query engine, background materializer.

Capacity: Up to ~16M items (f16) or ~60M items (uint8), 5M users, 500M signals/day on a 64 GB node.

What is built:

  • Everything specified in specs 01-11.
  • Key encoding with entity-id prefix (already shard-ready).
  • Per-entity-type storage isolation (already maps to independent shards).
  • WAL with self-contained segments (already shippable for replication).
  • Trait-abstracted storage engine, vector index, text index.
  • All operations are per-entity-scoped (no cross-entity storage transactions).

What stays the same in all future phases:

  • Key encoding format.
  • WAL record format and segment structure.
  • Storage trait (StorageEngine, VectorIndex, TextIndex).
  • Signal write path (dedup, WAL, hot-tier update, warm-tier update).
  • Background materializer logic.
  • Query language and ranking profile execution.
  • Checkpoint and crash recovery mechanism.

10.3 Phase 2: Read Replicas (Scale Queries)

When: Query throughput exceeds what a single node can serve (>1,600 queries/sec at 50ms each, requiring >16 cores dedicated to query processing).

What changes:

Component Change
WAL Leader ships sealed segments to followers.
Followers New process role: replay WAL, serve read queries. Identical codebase, different startup flag (--role=follower).
Query routing Thin load balancer routes read queries to followers. Write queries route to leader.
Consistency Followers serve reads with bounded staleness (replication lag). Write-after-read consistency via session affinity to leader.

What stays the same:

  • Single WAL, single data shard. No partitioning.
  • HNSW index on leader, replicated to followers via WAL replay of embedding writes.
  • All signal processing on leader. Followers read materialized signal state.

Code changes: ~500-1,000 lines. WAL segment shipping (background thread on leader, replay loop on follower). Load balancer configuration (external, not in tidalDB code). Startup flag for role selection.

10.4 Phase 3: Partitioned Signal Aggregation (Scale Signal Writes)

When: Signal write throughput exceeds single-node capacity (~50K-100K events/sec sustained), or item count exceeds HNSW memory on a single node (~16-60M items).

What changes:

Component Change
Data shards Multiple tidalDB instances, each owning a range of entity_ids. Same codebase, configured with a shard range.
Signal router New component: routes db.signal() calls to the correct data shard by hash(item_id).
Aggregation node New component: tails WAL streams from all data shards, maintains global + cohort velocity and trending materialized views.
Query nodes Serve ranking queries with local HNSW replica. Read signal data from data shards or signal cache.
Entity routing StorageEngine trait gets a new implementation: ShardedStorage that routes by entity_id prefix.

What stays the same:

  • Each data shard is a complete single-node tidalDB instance for its entity range. Same WAL, same hybrid storage, same checkpoint, same materializer.
  • Key encoding unchanged. Shard boundary is a range split on the 8-byte entity_id prefix.
  • Signal write path within a shard is unchanged.
  • Ranking profile execution unchanged.

Code changes: ~3,000-5,000 lines. Signal router, shard registry, WAL tailing for aggregation node, sharded storage implementation, query node signal cache, inter-node RPC (gRPC or custom protocol).

10.5 Phase 4: Sharded HNSW (Scale Vector Search Beyond Single-Node Memory)

When: Item count exceeds what fits in a single query node's HNSW index (~50-200M items depending on quantization and machine size).

What changes:

Component Change
HNSW index Split into IVF partitions. Each query node holds K/N partitions.
Query routing Query router computes nearest centroids and routes ANN search to the query nodes holding those partitions.
VectorIndex trait New implementation: PartitionedVectorIndex that fans out to partition-holding query nodes and merges results.

What stays the same:

  • Everything from Phase 3. Data shards, signal routing, aggregation, WAL, storage.
  • The VectorIndex::search() API. Callers do not know the index is partitioned.
  • Ranking profile execution. It receives candidate lists regardless of how they were generated.

Code changes: ~2,000-3,000 lines. IVF partitioning (k-means over embedding space, partition assignment), partitioned search with fan-out and merge, centroid index, partition placement on query nodes.

10.6 Phase Summary

Phase Trigger New Components Lines Changed Items Signals/Day
1 Initial launch None (single process) 0 1-16M 10M-500M
2 Query throughput WAL shipping, follower role, load balancer ~1K 1-16M 10M-500M
3 Signal throughput or item count Shard router, aggregation node, query nodes ~4K 1-100M 100M-4B
4 Item count (HNSW memory) IVF partitioning, partitioned vector search ~2.5K 100M-1B 1B-10B

11. Operational Considerations

11.1 Partition Rebalancing

When a data shard grows too large (by entity count or storage size), it must be split. The entity-id prefix encoding enables clean splits:

Split procedure:

  1. Choose a split point in the entity-id range (e.g., midpoint of the shard's range).
  2. Stop writes to the shard (briefly, <1 second, by buffering in the signal router).
  3. Copy all keys with entity_id >= split_point to a new shard.
  4. Update the shard registry (shard_id -> entity_id_range mapping).
  5. Resume writes. New events for entities >= split_point route to the new shard.
  6. Background: the old shard garbage-collects keys for entities that moved.

No entity is split. Because all keys for an entity share the same 8-byte prefix, a split never bisects an entity's data. This is the critical property enabled by the key encoding design in Storage Engine spec Section 5.

When to split: When a shard exceeds a configurable size threshold (default: 1 TB) or entity count threshold (default: 25M entities).

Rebalancing is offline-safe. Because each shard is a self-contained tidalDB instance, a split can be performed by: (a) taking a snapshot (checkpoint + WAL copy) of the old shard, (b) starting the new shard from the snapshot with a range filter, (c) catching up from the old shard's WAL for events that arrived during the copy.

11.2 Monitoring

Metric Source Alert Threshold
Signal write latency (p50, p99) Data shard p99 > 1ms
Ranking query latency (p50, p99) Query node p99 > 50ms
Trending query latency Aggregation node p99 > 30ms
WAL replication lag (seconds) Follower / aggregation > 10 seconds
Hot-tier entity count Data shard > 80% of max_hot_entities
HNSW index freshness (seconds since last update) Query node > 60 seconds
Tantivy index freshness Query node > 30 seconds
Materializer staleness Data shard, aggregation node Minute rollup > 2 minutes late
Cohort bitmap freshness Aggregation node > 2x refresh interval
Disk usage per shard Data shard > 80% of capacity
Signal dedup bloom filter FPR Data shard > 5% (bloom filter needs resizing)
Cross-shard read latency (signal enrichment) Query node p99 > 5ms

11.3 Capacity Planning Formulas

Memory per data shard:

M_shard = (64 * entities_in_shard * active_signals_per_entity)     # hot-tier
        + (1800 * entities_in_shard * 0.05 * active_signals)       # warm-tier (5% active)
        + (512 * entities_in_shard)                                 # metadata cache
        + (22 * users_in_shard)                                     # cohort memberships

Memory per query node:

M_query = (3200 * total_items)            # HNSW index (f16, 1536d)
        + (0.20 * text_data_size)         # Tantivy index
        + signal_cache_budget             # configurable, default 4 GB

Memory per aggregation node:

M_agg = (20 * cohort_tracked_items * active_signals * (56 + exact_segments))   # cohort counters
      + (top_k_lists * items_per_list * 16)                                     # materialized trending

Disk per data shard (7-day retention):

D_shard = (events_per_day * 64 * 7)              # raw events (64B avg, 7 days, 2x WA for FIFO)
        + (entities * 32 * active_signals * 10)   # SIG keys (leveled, 10x WA)
        + (entities * 512)                         # metadata (redb, minimal WA)
        + (hourly_rollup_bytes * 720)             # MV rollups (30 days)

11.4 Cost Model

Scale Tier Node Configuration Count Monthly Cost (Cloud Estimate)
Tier 1 (1M items) 1x 64GB / 16 core / 2TB NVMe 1 ~$500-800
Tier 2 (10M items) 1x 64GB leader + 1x 64GB follower 2 ~$1,000-1,600
Tier 3 (100M items) 4x 128GB data shards + 2x 512GB query nodes + 1x 64GB aggregation 7 ~$8,000-12,000
Tier 4 (1B items) 16x 128GB data shards + 8x 512GB query nodes + 2x 128GB aggregation 26 ~$30,000-50,000

The cost driver at Tier 3+ is query node memory for the HNSW index. Using uint8 quantization (4x compression, ~3-5% recall loss) halves the query node count and cost. Using DiskANN (SSD-backed) could eliminate the need for 512 GB query nodes entirely, at the cost of higher ANN latency.


12. Prior Art and Lessons Learned

12.1 Elasticsearch

Architecture: Hash-based shard routing. Documents assigned to shards by hash(doc_id) % num_shards. Shard count fixed at index creation. Coordinating nodes scatter-gather across all shards for every search query.

Lesson learned: Fixed shard count at index creation is a scaling trap. Elasticsearch's inability to change shard count without reindexing has caused more operational pain than any other design decision. tidalDB avoids this by using range-based sharding on entity_id with dynamic split/merge, following CockroachDB's model.

Lesson learned: Scatter-gather across all shards for every query is expensive at high shard counts. Elasticsearch mitigates with adaptive replica selection and caching, but fundamentally every search query touches every shard. tidalDB avoids this for the common case (ranking queries) by replicating the HNSW index to query nodes, making ANN local.

Source: Elasticsearch shard routing, Shard allocation.

12.2 Redis Cluster

Architecture: 16,384 hash slots, distributed across master nodes. slot = CRC16(key) % 16384. Each master owns a subset of slots. Multi-key operations require all keys to hash to the same slot (hash tags).

Lesson learned: Hash slot partitioning is simple and even, but it prevents efficient range scans. Redis Cluster cannot answer "give me all keys in range [A, B]" without scanning all slots. tidalDB's key encoding uses big-endian entity_id prefixes specifically to preserve range scan efficiency across entity-scoped data, while still supporting hash-based shard routing on the entity_id.

Lesson learned: The 16,384-slot limit is a practical ceiling on cluster size. Redis chose this to keep the slot bitmap at 2 KB per node. tidalDB's dynamic range splitting has no fixed partition count -- shards split as needed, limited only by the u64 entity_id keyspace.

Source: Redis Cluster specification, Hash slot distribution.

12.3 CockroachDB / TiDB

Architecture: Range-based partitioning on ordered keys. The keyspace is divided into contiguous ranges (~64-96 MB each). Ranges split and merge automatically based on size and load. A Placement Driver (PD in TiDB) coordinates range metadata and rebalancing.

Lesson learned: Range-based partitioning is superior to hash-based for workloads with range scans, which is all SQL workloads and tidalDB's entity-prefix-scan pattern. CockroachDB explicitly chose range over hash for this reason. tidalDB adopts range-based partitioning with entity_id as the range key.

Lesson learned: Automatic split/merge based on size AND load is critical. CockroachDB's default split threshold is 512 MiB per range; TiDB defaults to 96 MiB. But size alone is insufficient -- a small range serving 10,000 QPS needs splitting for load distribution, not storage. tidalDB must split based on both entity count and signal write throughput.

Lesson learned: A lightweight metadata service (CockroachDB's meta ranges, TiDB's PD) that tracks range-to-node mapping is essential. This service must be highly available (replicated via Raft) but handles very low throughput (range metadata changes infrequently).

Source: CockroachDB partitioning, TiDB architecture, TiDB scheduling.

12.4 Pinecone / Milvus / Qdrant

Architecture (common pattern): Separation of storage, indexing, and query serving. Data is ingested to storage nodes, indexes are built on index nodes or inline, and query nodes hold index replicas for serving.

Lesson learned (Pinecone): Scaling along two dimensions -- replicas for throughput, shards for capacity -- is the right model for vector databases. Pinecone's pod architecture makes this explicit. tidalDB's Option C follows this with query nodes (replicas for query throughput) and data shards (capacity for entity storage).

Lesson learned (Milvus): Full separation of compute and storage (query nodes are stateless, data in S3) enables elastic scaling but adds latency for cold-start queries. tidalDB keeps query nodes stateful (HNSW in memory) for sub-10ms ANN latency, accepting the cost of replication.

Lesson learned (Qdrant): Qdrant uses Raft for cluster topology consensus but NOT for point operations. Point writes do not go through consensus, reducing write latency. tidalDB follows the same model: schema changes use consensus, signal writes do not.

Source: Pinecone dedicated read nodes, Milvus architecture, Qdrant distributed deployment.

12.5 ClickHouse / Apache Druid

Architecture (ClickHouse): Share-nothing, sharded by a configurable sharding key. Distributed table engine acts as a proxy, forwarding queries to shards and aggregating results. Materialized views pre-aggregate data on ingestion.

Architecture (Druid): Time-partitioned immutable segments. Columnar storage with LZ4 compression. Real-time ingestion nodes + historical query nodes. Segments are 300-700 MB, partitioned by time interval.

Lesson learned (ClickHouse): Materialized views that pre-aggregate on ingestion are the key to fast analytical queries at scale. tidalDB's aggregation node follows this pattern exactly: signal events are pre-aggregated into velocity and trending materializations as they stream in.

Lesson learned (Druid): Time-based partitioning is natural for event data. Druid's segment model (immutable, time-bounded, independently loadable) maps directly to tidalDB's WAL segment model and FIFO-compacted event log. tidalDB's EVT keys are already time-ordered within each entity, enabling efficient time-range queries and retention-based cleanup.

Lesson learned (both): Pre-aggregation is not optional at scale. Scanning raw events at query time is infeasible beyond ~1M events per query. ClickHouse's materialized views and Druid's roll-up aggregation both demonstrate that the only path to sub-second analytical queries at billion-event scale is pre-computation.

Source: ClickHouse architecture, ClickHouse sharding deep dive, Druid architecture, Druid partitioning.

12.6 Vespa

Architecture: Content nodes hold documents + indexes (including HNSW). Container nodes are stateless query processors. Content groups hold full replicas. Auto-sharding with bucket-based distribution. Ranking and inference execute on content nodes (compute-local).

Lesson learned: Vespa's "compute where the data lives" principle eliminates the scatter-gather problem for ranking. Each content node scores its local documents, and results are merged by the container node. tidalDB's query node model adopts this: ANN + metadata filtering + scoring happen on the query node that holds the HNSW replica and cached metadata.

Lesson learned: Vespa's content group model (each group is a full replica that can independently answer any query) provides clean horizontal scaling for read throughput. tidalDB's query nodes are analogous to Vespa content groups: each holds a full HNSW replica and can independently answer ANN queries.

Source: Vespa architecture, Vinted: Goodbye Elasticsearch, Hello Vespa, Vespa sizing guide.

12.7 DiskANN

Architecture: SSD-resident Vamana graph with PQ-compressed vectors in memory. Achieves 1B-vector search with ~96 GB RAM (vs 3 TB for HNSW).

Lesson learned: For the "delay distribution" strategy, DiskANN extends the single-node ceiling by 10-40x at the cost of 3-10x higher ANN latency (5-15ms vs 1-5ms). tidalDB should evaluate DiskANN as a Phase 2.5 option that delays the need for Phase 4 (sharded HNSW) by keeping the vector index on a single large-NVMe node.

Source: DiskANN paper, From 3 TB RAM to 96 GB, VLDB 2025: Turbocharging Vector DBs with Modern SSDs.


Appendix A: Key Encoding and Shard Routing

The entity-id prefix encoding from Storage Engine spec Section 5.6 is the foundation of the partitioning strategy. This appendix consolidates how it supports shard routing.

Shard Routing via Entity ID Prefix

Key:  [entity_id: u64 BE][0x00][TAG][suffix]
       ^^^^^^^^^^^^^^^^
       Shard routing key (first 8 bytes)

Shard assignment: range-based
  Shard 0: entity_id in [0x0000000000000000, split_point_1)
  Shard 1: entity_id in [split_point_1, split_point_2)
  ...
  Shard N: entity_id in [split_point_N, 0xFFFFFFFFFFFFFFFF]

Routing: shard = binary_search(shard_ranges, entity_id)
  Cost: O(log N) where N = number of shards. At 16 shards: 4 comparisons.

Guarantee: all keys for entity X (SIG, EVT, META, REL, MV, IDX) are on the
same shard because they share the same 8-byte entity_id prefix.

Appendix B: Invariant Checklist

# Invariant Test Strategy
1 Shard routing is deterministic: the same entity_id always routes to the same shard for a given shard configuration. Property test: generate random entity_ids, verify routing is a pure function of entity_id and shard_ranges.
2 Shard splits never bisect an entity's data. All keys for entity X remain on the same shard after a split. Property test: simulate splits at random points, verify all keys for each entity share a shard.
3 WAL replication preserves ordering. Events replayed on a follower appear in the same seqno order as on the leader. Integration test: write events to leader, replay on follower, compare seqno sequences.
4 Signal enrichment across shards produces the same scoring as single-node scoring. Integration test: run same workload single-node and distributed, compare top-50 result sets (allowing for bounded staleness).
5 Aggregation node trending results are consistent with what would be computed from raw events on all shards. Property test: compute trending from raw events (ground truth) and compare with aggregation node output.
6 HNSW index on query nodes is eventually consistent with entity store embeddings. Integration test: insert/update/delete embeddings, wait for replication, verify query node index reflects changes.
7 After data shard failover, no acknowledged signal events are lost. Crash test: kill leader shard at random points, verify follower contains all acknowledged events.
8 Schema changes are applied consistently across all nodes. Integration test: define_signal on leader, verify all shards and query nodes reflect the new signal type.

Appendix C: Configuration Reference

Distributed Mode Configuration

Parameter Default Range Description
cluster.mode single single, distributed Operating mode. single = Phase 1. distributed = Phase 3+.
cluster.role leader leader, follower, query, aggregation Node role in distributed mode.
cluster.shard_id 0 0-65535 This node's shard ID (data shard role only).
cluster.shard_ranges [(0, u64::MAX)] Vec of (start, end) pairs Entity ID ranges for each shard.
replication.wal_ship_interval 5 sec 1-60 sec How often to ship sealed WAL segments to followers.
replication.factor 2 1-5 Number of copies of each data shard (1 = no replication).
aggregation.stream_lag_target 5 sec 1-30 sec Target maximum lag for aggregation node.
query.signal_cache_size 4 GB 512 MB - 64 GB Memory budget for signal state cache on query nodes.
query.hnsw_replication_mode stream stream, snapshot How query nodes receive HNSW updates.
shard.split_size_threshold 1 TB 256 GB - 4 TB Data shard size that triggers a split recommendation.
shard.split_entity_threshold 25M 5M - 100M Entity count that triggers a split recommendation.

Appendix D: References

  1. Storage Engine Specification (01). docs/specs/01-storage-engine.md. Key encoding, hybrid backend, WAL, checkpoint, tiered storage.
  2. Signal System Specification (03). docs/specs/03-signal-system.md. Signal aggregation, cohort-scoped signals, materializer, performance targets.
  3. Cohort Specification (05). docs/specs/05-cohorts.md. Cohort types, dimensional hierarchy, accuracy analysis.
  4. Vector Retrieval Specification (07). docs/specs/07-vector-retrieval.md. HNSW parameters, quantization, trait abstraction.
  5. VISION.md. VISION.md. Single-node-first philosophy, product requirements.
  6. thoughts.md. thoughts.md. Lessons from Engram, Citadel, StemeDB. Hybrid backend routing, WAL shipping, materialized views.
  7. Taft, R., et al. "CockroachDB: The Resilient Geo-Distributed SQL Database." SIGMOD 2020. Range-based partitioning, Raft consensus.
  8. Huang, D., et al. "TiDB: A Raft-based HTAP Database." VLDB 2020. Placement driver, region split/merge, load-based splitting.
  9. Elasticsearch documentation. "Search shard routing." https://www.elastic.co/docs/reference/elasticsearch/rest-apis/search-shard-routing. Hash-based shard routing, adaptive replica selection.
  10. Redis documentation. "Cluster specification." https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/. Hash slot partitioning, 16384 slots.
  11. Subramanya, S.J., et al. "DiskANN: Fast Accurate Billion-point Nearest Neighbor Search on a Single Node." NeurIPS 2019. SSD-resident graph index.
  12. Lin, W. "From 3 TB RAM to 96 GB: superseding billion vector HNSW with 40x cheaper DiskANN." 2024. https://blog.wilsonl.in/diskann/. Production DiskANN experience.
  13. Qdrant documentation. "Distributed Deployment." https://qdrant.tech/documentation/guides/distributed_deployment/. Raft for topology, no consensus for point operations.
  14. Vespa documentation. "Architecture." https://vespa.ai/architecture/. Content groups, compute-local ranking, auto-sharding.
  15. Bergum, J.K. "Billion-scale vector search with Vespa." 2023. Full-replica content groups for vector search at scale.
  16. ClickHouse/Altinity. "Deep Dive on ClickHouse Sharding and Replication." 2024. https://altinity.com/wp-content/uploads/2024/05/Deep-Dive-on-ClickHouse-Sharding-and-Replication-2024-1-1.pdf. Distributed table engine, materialized views.
  17. Apache Druid documentation. "Segments." https://druid.apache.org/docs/latest/design/segments/. Time-partitioned immutable segments, columnar storage.
  18. Milvus documentation. "Architecture." 2024. Separation of storage, compute, and metadata for vector databases.
  19. Pinecone. "Dedicated Read Nodes." InfoQ, 2025. https://www.infoq.com/news/2025/12/pinecone-drn-vector-workloads/. Shards for capacity, replicas for throughput.
  20. Vinted Engineering. "Goodbye Elasticsearch, Hello Vespa." 2024. https://vinted.engineering/2024/09/05/goodbye-elasticsearch-hello-vespa/. Vespa scaling advantages over Elasticsearch shard model.