use crate::error::Result; use crate::worker::IngestWorker; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use stemedb_storage::KVStore; use stemedb_wal::Journal; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::{debug, info, instrument, warn}; /// Manager for the background ingestion process. /// /// The Ingestor owns a background task that continuously reads from the WAL /// and writes to the KV store. It provides lifecycle management including /// graceful shutdown coordination. pub struct Ingestor { worker: Arc>>, handle: Option>, /// Shared shutdown signal between Ingestor and background task. shutdown: Arc, } impl Ingestor { /// Create a new Ingestor, loading the persisted cursor if available. pub async fn new(journal: Arc>, store: Arc) -> Result { let shutdown = Arc::new(AtomicBool::new(false)); let worker = Arc::new(Mutex::new( IngestWorker::with_shutdown(journal, store, shutdown.clone()).await?, )); debug!("Ingestor created"); Ok(Self { worker, handle: None, shutdown }) } /// Start the background ingestion task. #[instrument(skip(self))] pub fn start(&mut self) { if self.handle.is_some() { debug!("Ingestor already running"); return; } info!("Starting background ingestion task"); let worker = self.worker.clone(); let shutdown = self.shutdown.clone(); self.handle = Some(tokio::spawn(async move { // Don't hold the lock continuously - acquire it per iteration // to avoid blocking process_pending() and allow graceful shutdown loop { // Check shutdown before acquiring lock if shutdown.load(Ordering::Relaxed) { info!("Shutdown signal received before lock acquisition"); break; } let step_result = { let mut w = worker.lock().await; // Check shutdown again after acquiring lock if w.is_shutdown() { break; } w.step().await }; match step_result { Ok(0) => { // No new data, sleep briefly tokio::time::sleep(std::time::Duration::from_millis(10)).await; } Ok(_) => { // Processed data, continue immediately } Err(e) => { // On shutdown, WAL errors are expected if shutdown.load(Ordering::Relaxed) { debug!("Error during shutdown (expected): {:?}", e); break; } use crate::error::IngestError; match &e { IngestError::InputValidation(msg) => { warn!("Rejected invalid input: {}", msg); } IngestError::InvalidSignature(msg) => { warn!("Rejected invalid signature: {}", msg); } _ => { use tracing::error; error!("Ingestion error: {:?}", e); } } tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } } info!("Ingestion loop stopped"); })); } /// Gracefully shut down the background ingestion task. /// /// This signals the background task to stop and waits for it to exit. /// If the task doesn't stop within the timeout, it will be forcibly aborted. /// /// # Arguments /// * `timeout` - Maximum time to wait for graceful shutdown before aborting. #[instrument(skip(self))] pub async fn shutdown(&mut self, timeout: Duration) { // Signal shutdown self.shutdown.store(true, Ordering::Relaxed); info!("Shutdown signal sent to ingestion task"); if let Some(handle) = self.handle.take() { // Wait for graceful shutdown with timeout match tokio::time::timeout(timeout, handle).await { Ok(Ok(())) => { info!("Ingestion task shut down gracefully"); } Ok(Err(e)) => { warn!("Ingestion task panicked during shutdown: {:?}", e); } Err(_) => { warn!("Ingestion task did not stop within {:?}, task will be dropped", timeout); // The handle is already taken, so the task will be detached // when the Ingestor is dropped. This is acceptable since // we've already signaled shutdown. } } } else { debug!("No running ingestion task to shut down"); } } /// Check if the ingestor is currently running. pub fn is_running(&self) -> bool { self.handle.as_ref().is_some_and(|h| !h.is_finished()) } /// Process pending WAL entries immediately (for testing). #[instrument(skip(self))] pub async fn process_pending(&self) -> Result { let mut worker = self.worker.lock().await; let mut total_bytes = 0; loop { let bytes = worker.step().await?; if bytes == 0 { break; } total_bytes += bytes; } debug!(total_bytes, "Processed pending entries"); Ok(total_bytes) } } impl Drop for Ingestor { fn drop(&mut self) { // Signal shutdown to prevent the background task from accessing // resources that may be dropped after us. self.shutdown.store(true, Ordering::Relaxed); // If the handle is still present, the task will be dropped when the // JoinHandle is dropped. The task will see the shutdown signal and // exit gracefully, or it will be aborted by the runtime. if self.handle.is_some() { debug!("Ingestor dropped with running task, shutdown signal sent"); } } }