11 KiB
Task 03: WAL Compaction
Delivers
Automatic deletion of WAL segments that are fully covered by a successful checkpoint. After each periodic checkpoint and during graceful shutdown, segments with first_seq <= checkpoint_seq are atomically deleted. The write-new-checkpoint-then-delete-old pattern guarantees that at no point are both the checkpoint and the covering WAL segments absent. Crash during compaction is safe: the worst case is redundant segments that get replayed on next open.
Complexity: M
Dependencies
- Task 01 (CrashPoint enum -- for testing compaction under crash conditions)
Technical Design
1. Compaction module
// tidal/src/wal/compaction.rs
use std::path::Path;
use super::error::WalError;
use super::segment;
/// Result of a compaction operation.
#[derive(Debug)]
pub struct CompactionResult {
/// Number of WAL segments deleted.
pub segments_deleted: usize,
/// Total bytes reclaimed (sum of deleted segment file sizes).
pub bytes_reclaimed: u64,
/// Remaining segment count after compaction.
pub segments_remaining: usize,
}
/// Delete WAL segments that are fully covered by the checkpoint at `checkpoint_seq`.
///
/// A segment with `first_seq < checkpoint_seq` is safe to delete because all its
/// events have been materialized to the signal ledger checkpoint. A segment with
/// `first_seq == checkpoint_seq` may contain events both before and after the
/// checkpoint; it is NOT deleted (conservative: we replay a few extra events
/// rather than risk losing uncovered events).
///
/// # Safety invariant
///
/// The caller must ensure the checkpoint at `checkpoint_seq` has been durably
/// committed to storage BEFORE calling this function. The order is:
///
/// 1. `ledger.checkpoint(storage, meta)` -- durable
/// 2. `storage.flush()` -- durable
/// 3. `compact_wal(wal_dir, meta.wal_sequence)` -- safe to lose old segments
///
/// If we crash between steps 1 and 3, old segments survive and are replayed
/// redundantly on next open. This is correct (idempotent).
///
/// If we crash during step 3 (partial deletion), some segments are deleted
/// and others are not. This is also correct: deleted segments are covered
/// by the checkpoint, surviving segments are replayed.
///
/// # Errors
///
/// Returns `WalError::Io` on filesystem failure. Partial deletion may occur
/// if an error is encountered mid-way -- this is safe (see invariant above).
pub fn compact_wal(wal_dir: &Path, checkpoint_seq: u64) -> Result<CompactionResult, WalError> {
let segments = segment::list_segments(wal_dir)?;
let total_before = segments.len();
let mut deleted = 0usize;
let mut bytes_reclaimed = 0u64;
for (seg_first_seq, seg_path) in &segments {
// Only delete segments whose first_seq is strictly less than the
// checkpoint sequence. Segments starting at or after checkpoint_seq
// may contain events that are not yet covered by the checkpoint.
if *seg_first_seq < checkpoint_seq {
// Read file size before deleting for the reclamation metric.
let file_size = std::fs::metadata(seg_path)
.map(|m| m.len())
.unwrap_or(0);
std::fs::remove_file(seg_path)?;
deleted += 1;
bytes_reclaimed += file_size;
tracing::debug!(
segment_first_seq = seg_first_seq,
file_size,
"compacted WAL segment"
);
}
}
// Fsync the directory to ensure the unlink operations are durable.
// Without this, a crash after deletion but before directory metadata
// flush could "resurrect" deleted segment files.
if deleted > 0 {
let dir_fd = std::fs::File::open(wal_dir)?;
dir_fd.sync_all()?;
}
let remaining = total_before - deleted;
tracing::info!(
deleted,
bytes_reclaimed,
remaining,
checkpoint_seq,
"WAL compaction complete"
);
Ok(CompactionResult {
segments_deleted: deleted,
bytes_reclaimed,
segments_remaining: remaining,
})
}
2. Add pub mod compaction; to tidal/src/wal/mod.rs
pub mod compaction;
3. Integrate into periodic checkpoint thread
Modify tidal/src/db/state_rebuild.rs run_checkpoint_thread() to call compact_wal after each successful checkpoint:
// In run_checkpoint_thread, after the successful checkpoint block:
if let Err(e) = ledger.checkpoint(storage.as_ref(), meta) {
tracing::error!(error = %e, "periodic signal checkpoint failed");
} else {
tracing::debug!("periodic signal checkpoint written");
// Compact WAL segments covered by this checkpoint.
// The wal_dir is data_dir/wal. We derive it from the storage path.
if let Some(wal_dir) = wal_dir.as_ref() {
match crate::wal::compaction::compact_wal(wal_dir, meta.wal_sequence) {
Ok(result) => {
if result.segments_deleted > 0 {
tracing::info!(
deleted = result.segments_deleted,
reclaimed_bytes = result.bytes_reclaimed,
"WAL compacted after periodic checkpoint"
);
}
}
Err(e) => {
// Compaction failure is non-fatal: old segments just
// take up disk space until the next compaction.
tracing::warn!(error = %e, "WAL compaction failed");
}
}
}
}
The run_checkpoint_thread function needs an additional parameter for the WAL directory path:
pub(super) fn run_checkpoint_thread(
shutdown: Arc<AtomicBool>,
ledger: Arc<SignalLedger>,
cohort_ledger: Arc<CohortSignalLedger>,
storage: Box<dyn StorageEngine + Send + Sync>,
last_wal_seq: Arc<AtomicU64>,
wal_dir: Option<std::path::PathBuf>, // NEW: WAL directory for compaction
) {
// ...
}
4. Integrate into shutdown
Modify tidal/src/db/mod.rs shutdown_inner(). The existing shutdown already does:
- Checkpoint ledger to storage
- Write WAL checkpoint marker
- Truncate segments before checkpoint
Replace step 3 with the new compaction function:
// In shutdown_inner, replace the truncate_before call:
if let Err(e) = crate::wal::compaction::compact_wal(&self.wal_dir(), seq) {
tracing::error!(error = %e, "WAL compaction failed during shutdown");
}
The existing wal.truncate_before(seq) in shutdown_inner delegates to the writer thread's TruncateBefore command. The new compact_wal operates directly on the filesystem. Since we call compact_wal AFTER wal.checkpoint(seq) and the WAL writer is being shut down, there is no race. Both approaches delete the same segments; the new one adds directory fsync and metrics.
5. WAL directory accessor
Add a helper to TidalDb to derive the WAL directory path:
impl TidalDb {
/// Return the WAL directory path for this database.
///
/// Returns `None` in ephemeral mode (no WAL).
fn wal_dir(&self) -> Option<std::path::PathBuf> {
self.config.data_dir.as_ref().map(|d| d.join("wal"))
}
}
Acceptance Criteria
compact_wal(dir, seq)deletes all segments withfirst_seq < seq- Segments with
first_seq >= seqare preserved - Directory is fsynced after deletion to ensure durability of unlinks
CompactionResultreportssegments_deleted,bytes_reclaimed,segments_remaining- Periodic checkpoint thread calls
compact_walafter each successful checkpoint - Shutdown calls
compact_walafter writing the WAL checkpoint marker - Compaction failure is non-fatal (logged as warning, does not abort shutdown or checkpoint)
- Crash during compaction is safe: reopen replays any surviving segments correctly
cargo test --manifest-path tidal/Cargo.tomlpasses with compaction integrated- Unit tests:
compact_empty_dir,compact_deletes_old_segments,compact_preserves_current,compact_no_segments_to_delete,compact_all_segments_old,compact_crash_during_deletion_is_safe
Test Strategy
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::wal::segment::{SegmentWriter, list_segments, segment_filename};
use std::fs;
#[test]
fn compact_empty_dir() {
let dir = tempfile::tempdir().unwrap();
let result = compact_wal(dir.path(), 100).unwrap();
assert_eq!(result.segments_deleted, 0);
assert_eq!(result.segments_remaining, 0);
}
#[test]
fn compact_deletes_old_segments() {
let dir = tempfile::tempdir().unwrap();
// Create segments at seq 1, 50, 100, 200.
for &seq in &[1u64, 50, 100, 200] {
let _ = SegmentWriter::open(dir.path(), seq, 1024).unwrap();
}
assert_eq!(list_segments(dir.path()).unwrap().len(), 4);
// Compact with checkpoint at seq=100.
// Segments 1 and 50 have first_seq < 100, so they are deleted.
// Segments 100 and 200 are preserved.
let result = compact_wal(dir.path(), 100).unwrap();
assert_eq!(result.segments_deleted, 2);
assert_eq!(result.segments_remaining, 2);
let remaining = list_segments(dir.path()).unwrap();
assert_eq!(remaining[0].0, 100);
assert_eq!(remaining[1].0, 200);
}
#[test]
fn compact_preserves_current_segment() {
let dir = tempfile::tempdir().unwrap();
let _ = SegmentWriter::open(dir.path(), 100, 1024).unwrap();
// Checkpoint at seq=100: segment starting at 100 is NOT deleted
// (it may contain events >= 100).
let result = compact_wal(dir.path(), 100).unwrap();
assert_eq!(result.segments_deleted, 0);
assert_eq!(result.segments_remaining, 1);
}
#[test]
fn compact_no_segments_to_delete() {
let dir = tempfile::tempdir().unwrap();
let _ = SegmentWriter::open(dir.path(), 500, 1024).unwrap();
let result = compact_wal(dir.path(), 100).unwrap();
assert_eq!(result.segments_deleted, 0);
assert_eq!(result.segments_remaining, 1);
}
#[test]
fn compact_all_segments_old() {
let dir = tempfile::tempdir().unwrap();
for &seq in &[1u64, 10, 20] {
let _ = SegmentWriter::open(dir.path(), seq, 1024).unwrap();
}
let result = compact_wal(dir.path(), 1000).unwrap();
assert_eq!(result.segments_deleted, 3);
assert_eq!(result.segments_remaining, 0);
}
#[test]
fn compact_idempotent() {
let dir = tempfile::tempdir().unwrap();
let _ = SegmentWriter::open(dir.path(), 1, 1024).unwrap();
let _ = SegmentWriter::open(dir.path(), 100, 1024).unwrap();
compact_wal(dir.path(), 100).unwrap();
// Running compaction again should be a no-op.
let result = compact_wal(dir.path(), 100).unwrap();
assert_eq!(result.segments_deleted, 0);
assert_eq!(result.segments_remaining, 1);
}
}