stemedb/crates/stemedb-ingest/src/ingestor.rs
jordan c59066949a feat: Add quickstart "Beyond Hello World" sections with Skeptic and Layered endpoints
- Add Layered() method to Go SDK for per-source-class consensus queries
- Add LayeredQueryParams, LayeredResult, TierResolution types to Go SDK
- Create conflict example demonstrating Skeptic and Layered endpoints
- Update quickstart.md with sections 6 (conflict detection) and 7 (authority tiers)
- Remove tracked Go binary and add data/ to .gitignore

The new quickstart sections demonstrate Episteme's differentiating features:
- Skeptic endpoint shows "Trust but Verify" conflict analysis
- Layered endpoint shows per-tier resolution (Clinical vs Anecdotal)

Note: Pre-existing large files flagged by pre-commit hook (technical debt from prior sessions)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 21:00:59 -07:00

121 lines
4.3 KiB
Rust

use crate::error::Result;
use crate::worker::IngestWorker;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use stemedb_storage::KVStore;
use stemedb_wal::Journal;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::{debug, info, instrument, warn};
/// Manager for the background ingestion process.
///
/// The Ingestor owns a background task that continuously reads from the WAL
/// and writes to the KV store. It provides lifecycle management including
/// graceful shutdown coordination.
pub struct Ingestor<S> {
worker: Arc<Mutex<IngestWorker<S>>>,
handle: Option<JoinHandle<()>>,
/// Shared shutdown signal between Ingestor and background task.
shutdown: Arc<AtomicBool>,
}
impl<S: KVStore + 'static> Ingestor<S> {
/// Create a new Ingestor, loading the persisted cursor if available.
pub async fn new(journal: Arc<Mutex<Journal>>, store: Arc<S>) -> Result<Self> {
let shutdown = Arc::new(AtomicBool::new(false));
let worker = Arc::new(Mutex::new(
IngestWorker::with_shutdown(journal, store, shutdown.clone()).await?,
));
debug!("Ingestor created");
Ok(Self { worker, handle: None, shutdown })
}
/// Start the background ingestion task.
#[instrument(skip(self))]
pub fn start(&mut self) {
if self.handle.is_some() {
debug!("Ingestor already running");
return;
}
info!("Starting background ingestion task");
let worker = self.worker.clone();
self.handle = Some(tokio::spawn(async move {
let mut w = worker.lock().await;
w.run().await;
}));
}
/// Gracefully shut down the background ingestion task.
///
/// This signals the background task to stop and waits for it to exit.
/// If the task doesn't stop within the timeout, it will be forcibly aborted.
///
/// # Arguments
/// * `timeout` - Maximum time to wait for graceful shutdown before aborting.
#[instrument(skip(self))]
pub async fn shutdown(&mut self, timeout: Duration) {
// Signal shutdown
self.shutdown.store(true, Ordering::Relaxed);
info!("Shutdown signal sent to ingestion task");
if let Some(handle) = self.handle.take() {
// Wait for graceful shutdown with timeout
match tokio::time::timeout(timeout, handle).await {
Ok(Ok(())) => {
info!("Ingestion task shut down gracefully");
}
Ok(Err(e)) => {
warn!("Ingestion task panicked during shutdown: {:?}", e);
}
Err(_) => {
warn!("Ingestion task did not stop within {:?}, task will be dropped", timeout);
// The handle is already taken, so the task will be detached
// when the Ingestor is dropped. This is acceptable since
// we've already signaled shutdown.
}
}
} else {
debug!("No running ingestion task to shut down");
}
}
/// Check if the ingestor is currently running.
pub fn is_running(&self) -> bool {
self.handle.as_ref().is_some_and(|h| !h.is_finished())
}
/// Process pending WAL entries immediately (for testing).
#[instrument(skip(self))]
pub async fn process_pending(&self) -> Result<u64> {
let mut worker = self.worker.lock().await;
let mut total_bytes = 0;
loop {
let bytes = worker.step().await?;
if bytes == 0 {
break;
}
total_bytes += bytes;
}
debug!(total_bytes, "Processed pending entries");
Ok(total_bytes)
}
}
impl<S> Drop for Ingestor<S> {
fn drop(&mut self) {
// Signal shutdown to prevent the background task from accessing
// resources that may be dropped after us.
self.shutdown.store(true, Ordering::Relaxed);
// If the handle is still present, the task will be dropped when the
// JoinHandle is dropped. The task will see the shutdown signal and
// exit gracefully, or it will be aborted by the runtime.
if self.handle.is_some() {
debug!("Ingestor dropped with running task, shutdown signal sent");
}
}
}