- M5p1: BM25 text indexing via Tantivy with background syncer (0.26ms @ 10K docs) - M5p2: RRF fusion layer combining BM25 + ANN scores (46µs @ 1K candidates) - M5p3: unified Search query API (8-stage pipeline, BM25 + vector + ranking) - M5p4: creator text + vector indexing and creator search executor (< 20ms @ 200 creators) - Refactor db/mod.rs into focused sub-modules (creators, items, sessions, signals, etc.) - Decompose monolithic files into directory modules (query/executor, ranking/diversity, etc.) - Split brute.rs → brute/mod.rs + brute/tests.rs; extract search executor helpers - Add benches: fusion, search, session, text_index - Add M5 UAT test suites (m5_uat, m5_search, m5p4_creator_search, text_index) - Update blog posts, roadmap, content strategy, and M5 planning docs - Add tmp/ and .claude/worktrees/ to .gitignore Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
9.0 KiB
Task 03: Background Syncer
Delivers
TextIndexSyncer — a background thread that reads entity store writes (tracked via a sequence counter), feeds Tantivy writer, commits on interval (every 1000 docs or 2 seconds), and stores the last-processed sequence number in the commit payload. On crash recovery, reads the commit payload to find the resume point and replays from the entity store.
Complexity: L
Dependencies
- Task 01 complete:
TextIndex,TextIndexConfig - Task 02 complete:
TextIndexWriter,commit(seq),last_committed_seq() StorageEnginetrait withscan_prefix()for rebuild
Technical Design
Approach
Use an outbox sequence counter approach. The entity store write path increments a shared AtomicU64 sequence counter each time an item is written. The syncer reads this counter and processes any items with sequence numbers above its last committed value.
For the initial m5p1 implementation, use a simpler approach:
- The syncer runs on a configurable interval (default: 2 seconds)
- On each tick, it scans ALL items from the entity store and re-indexes them if their sequence number is higher than last committed
- A more sophisticated outbox pattern (WAL-based) is deferred to future work
This is correct but not optimally efficient — full rebuild handles correctness, partial updates optimize throughput. For 10K items, a full rebuild takes < 1 second, so this is acceptable.
Actually, looking at the WAL sequence numbers and the entity store, the simplest correct approach is:
- Maintain a monotonic
write_counter: AtomicU64inTidalDbthat increments on eachwrite_item_with_metadata()call - The syncer checks if
write_counter > last_committed_seqand if so, does a full index rebuild - This guarantees correctness at the cost of always doing a full rebuild (acceptable for 10K items)
For a more sophisticated approach with incremental updates, we track which entity IDs have been updated since the last commit via a concurrent queue:
// In TidalDb: a channel where item writes post (entity_id, write_seq) pairs
pending_text_updates: crossbeam::channel::Sender<(EntityId, u64)>
The syncer receives these pairs, batches them, and commits on interval.
Use the channel approach — it's more efficient and correctly handles the outbox pattern.
TextIndexSyncer
// tidal/src/text/syncer.rs
use std::sync::Arc;
use std::time::{Duration, Instant};
use crossbeam::channel::{Receiver, RecvTimeoutError};
use crate::schema::EntityId;
use crate::text::index::TextIndex;
use crate::storage::StorageEngine;
use crate::TidalError;
/// A pending write event: entity_id + WAL sequence number of the write.
#[derive(Debug, Clone)]
pub struct PendingWrite {
pub entity_id: EntityId,
pub metadata: std::collections::HashMap<String, String>,
pub seq: u64,
/// If true, this is a delete (item was removed).
pub deleted: bool,
}
/// Background syncer that feeds the Tantivy text index from the entity store outbox.
pub struct TextIndexSyncer {
index: Arc<TextIndex>,
rx: Receiver<PendingWrite>,
commit_every_n: usize,
commit_every: Duration,
}
impl TextIndexSyncer {
pub fn new(
index: Arc<TextIndex>,
rx: Receiver<PendingWrite>,
commit_every_n: usize,
commit_every_secs: u64,
) -> Self {
Self {
index,
rx,
commit_every_n,
commit_every: Duration::from_secs(commit_every_secs),
}
}
/// Run the syncer loop. Blocks until the channel is closed (sender dropped).
///
/// This is intended to run on a dedicated background thread.
pub fn run(self) -> crate::Result<()> {
let mut pending_count = 0usize;
let mut last_commit_time = Instant::now();
let mut last_seq = 0u64;
let mut writer = self.index.writer_guard()?;
loop {
// Try to receive with timeout
match self.rx.recv_timeout(Duration::from_millis(100)) {
Ok(update) => {
if update.deleted {
writer.delete_item(update.entity_id);
} else {
writer.index_item(update.entity_id, &update.metadata)?;
}
if update.seq > last_seq {
last_seq = update.seq;
}
pending_count += 1;
// Commit if batch is full
if pending_count >= self.commit_every_n {
writer.commit(last_seq)?;
pending_count = 0;
last_commit_time = Instant::now();
}
}
Err(RecvTimeoutError::Timeout) => {
// Commit on timeout if there are pending documents
if pending_count > 0 && last_commit_time.elapsed() >= self.commit_every {
writer.commit(last_seq)?;
pending_count = 0;
last_commit_time = Instant::now();
}
}
Err(RecvTimeoutError::Disconnected) => {
// Channel closed: flush remaining
if pending_count > 0 {
writer.commit(last_seq)?;
}
break;
}
}
}
Ok(())
}
}
Crash Recovery
On TidalDb::open() (or TidalDb::builder().open()), after opening the Tantivy index:
let last_committed = TextIndexWriter::last_committed_seq(&text_index.index);
// The syncer will process events with seq > last_committed
// Since entity_writes are tracked, items written after last_committed
// will be re-submitted to the syncer automatically on the first cycle.
For the initial implementation, implement rebuild_from():
impl TextIndex {
/// Rebuild the Tantivy index from the entity store.
///
/// Scans all items in the entity store and re-indexes them.
/// The last committed sequence is set to `last_seq` after rebuild.
///
/// Used for crash recovery and initial setup.
pub fn rebuild_from(
&self,
storage: &dyn crate::storage::StorageEngine,
last_seq: u64,
) -> crate::Result<()> {
let mut writer = self.writer_guard()?;
// Delete all existing documents
writer.writer.delete_all_documents()
.map_err(|e| TidalError::Internal(format!("tantivy delete_all: {e}")))?;
// Scan all items from entity store
for entry in storage.scan_prefix(&[]) {
let (key, value) = entry.map_err(|e| TidalError::from(e))?;
// Parse entity_id from key, metadata from value
// ... decode and index each item
}
writer.commit(last_seq)
}
}
Integration in TidalDb
Add to TidalDb:
text_index: Option<Arc<TextIndex>>—Noneif no text fields declared in schematext_tx: Option<crossbeam::channel::Sender<PendingWrite>>— channel to syncertext_syncer_thread: Option<std::thread::JoinHandle<crate::Result<()>>>— background thread
On write_item_with_metadata(), after the entity store write, send to text_tx if Some.
On close() / shutdown(), drop text_tx to signal the syncer to flush and exit, then join the thread.
Acceptance Criteria
TextIndexSyncerstruct withnew()andrun()methodsPendingWritestruct withentity_id,metadata,seq,deletedfields- Syncer commits after
commit_every_ndocuments - Syncer commits after
commit_every_secstimeout even with fewer documents - Syncer flushes remaining documents when channel is closed (graceful shutdown)
- Each commit stores
last_seqin the Tantivy commit payload TextIndex::rebuild_from(storage, last_seq)scans entity store and re-indexes all itemsTidalDbholdsOption<Arc<TextIndex>>—Noneif schema has no text fieldsTidalDb::write_item_with_metadata()sendsPendingWriteto the syncer channelTidalDb::close()drops the channel sender and joins the syncer thread- Unit tests:
syncer_commits_on_batch,syncer_commits_on_timeout,syncer_flushes_on_shutdown,rebuild_from_indexes_all_items cargo check,cargo fmt,cargo clippy -D warningsall pass
Test Strategy
#[test]
fn syncer_commits_on_batch() {
let (tx, rx) = crossbeam::channel::unbounded();
let idx = Arc::new(TextIndex::ephemeral(&test_fields()).unwrap());
let syncer = TextIndexSyncer::new(Arc::clone(&idx), rx, 3, 60);
let handle = std::thread::spawn(move || syncer.run());
// Send 3 items → triggers commit
for i in 0..3u64 {
tx.send(PendingWrite {
entity_id: EntityId::new(i),
metadata: make_meta(i),
seq: i + 1,
deleted: false,
}).unwrap();
}
// Drop sender to trigger flush
drop(tx);
handle.join().unwrap().unwrap();
// Verify all 3 items are in the index
let searcher = idx.reader.searcher();
assert_eq!(searcher.num_docs(), 3);
}