tidaldb/docs/planning/milestone-7/phase-1/task-03-wal-compaction.md
2026-02-23 22:41:16 -07:00

11 KiB

Task 03: WAL Compaction

Delivers

Automatic deletion of WAL segments that are fully covered by a successful checkpoint. After each periodic checkpoint and during graceful shutdown, segments with first_seq <= checkpoint_seq are atomically deleted. The write-new-checkpoint-then-delete-old pattern guarantees that at no point are both the checkpoint and the covering WAL segments absent. Crash during compaction is safe: the worst case is redundant segments that get replayed on next open.

Complexity: M

Dependencies

  • Task 01 (CrashPoint enum -- for testing compaction under crash conditions)

Technical Design

1. Compaction module

// tidal/src/wal/compaction.rs

use std::path::Path;

use super::error::WalError;
use super::segment;

/// Result of a compaction operation.
#[derive(Debug)]
pub struct CompactionResult {
    /// Number of WAL segments deleted.
    pub segments_deleted: usize,
    /// Total bytes reclaimed (sum of deleted segment file sizes).
    pub bytes_reclaimed: u64,
    /// Remaining segment count after compaction.
    pub segments_remaining: usize,
}

/// Delete WAL segments that are fully covered by the checkpoint at `checkpoint_seq`.
///
/// A segment with `first_seq < checkpoint_seq` is safe to delete because all its
/// events have been materialized to the signal ledger checkpoint. A segment with
/// `first_seq == checkpoint_seq` may contain events both before and after the
/// checkpoint; it is NOT deleted (conservative: we replay a few extra events
/// rather than risk losing uncovered events).
///
/// # Safety invariant
///
/// The caller must ensure the checkpoint at `checkpoint_seq` has been durably
/// committed to storage BEFORE calling this function. The order is:
///
/// 1. `ledger.checkpoint(storage, meta)` -- durable
/// 2. `storage.flush()` -- durable
/// 3. `compact_wal(wal_dir, meta.wal_sequence)` -- safe to lose old segments
///
/// If we crash between steps 1 and 3, old segments survive and are replayed
/// redundantly on next open. This is correct (idempotent).
///
/// If we crash during step 3 (partial deletion), some segments are deleted
/// and others are not. This is also correct: deleted segments are covered
/// by the checkpoint, surviving segments are replayed.
///
/// # Errors
///
/// Returns `WalError::Io` on filesystem failure. Partial deletion may occur
/// if an error is encountered mid-way -- this is safe (see invariant above).
pub fn compact_wal(wal_dir: &Path, checkpoint_seq: u64) -> Result<CompactionResult, WalError> {
    let segments = segment::list_segments(wal_dir)?;
    let total_before = segments.len();

    let mut deleted = 0usize;
    let mut bytes_reclaimed = 0u64;

    for (seg_first_seq, seg_path) in &segments {
        // Only delete segments whose first_seq is strictly less than the
        // checkpoint sequence. Segments starting at or after checkpoint_seq
        // may contain events that are not yet covered by the checkpoint.
        if *seg_first_seq < checkpoint_seq {
            // Read file size before deleting for the reclamation metric.
            let file_size = std::fs::metadata(seg_path)
                .map(|m| m.len())
                .unwrap_or(0);

            std::fs::remove_file(seg_path)?;
            deleted += 1;
            bytes_reclaimed += file_size;

            tracing::debug!(
                segment_first_seq = seg_first_seq,
                file_size,
                "compacted WAL segment"
            );
        }
    }

    // Fsync the directory to ensure the unlink operations are durable.
    // Without this, a crash after deletion but before directory metadata
    // flush could "resurrect" deleted segment files.
    if deleted > 0 {
        let dir_fd = std::fs::File::open(wal_dir)?;
        dir_fd.sync_all()?;
    }

    let remaining = total_before - deleted;

    tracing::info!(
        deleted,
        bytes_reclaimed,
        remaining,
        checkpoint_seq,
        "WAL compaction complete"
    );

    Ok(CompactionResult {
        segments_deleted: deleted,
        bytes_reclaimed,
        segments_remaining: remaining,
    })
}

2. Add pub mod compaction; to tidal/src/wal/mod.rs

pub mod compaction;

3. Integrate into periodic checkpoint thread

Modify tidal/src/db/state_rebuild.rs run_checkpoint_thread() to call compact_wal after each successful checkpoint:

// In run_checkpoint_thread, after the successful checkpoint block:

if let Err(e) = ledger.checkpoint(storage.as_ref(), meta) {
    tracing::error!(error = %e, "periodic signal checkpoint failed");
} else {
    tracing::debug!("periodic signal checkpoint written");

    // Compact WAL segments covered by this checkpoint.
    // The wal_dir is data_dir/wal. We derive it from the storage path.
    if let Some(wal_dir) = wal_dir.as_ref() {
        match crate::wal::compaction::compact_wal(wal_dir, meta.wal_sequence) {
            Ok(result) => {
                if result.segments_deleted > 0 {
                    tracing::info!(
                        deleted = result.segments_deleted,
                        reclaimed_bytes = result.bytes_reclaimed,
                        "WAL compacted after periodic checkpoint"
                    );
                }
            }
            Err(e) => {
                // Compaction failure is non-fatal: old segments just
                // take up disk space until the next compaction.
                tracing::warn!(error = %e, "WAL compaction failed");
            }
        }
    }
}

The run_checkpoint_thread function needs an additional parameter for the WAL directory path:

pub(super) fn run_checkpoint_thread(
    shutdown: Arc<AtomicBool>,
    ledger: Arc<SignalLedger>,
    cohort_ledger: Arc<CohortSignalLedger>,
    storage: Box<dyn StorageEngine + Send + Sync>,
    last_wal_seq: Arc<AtomicU64>,
    wal_dir: Option<std::path::PathBuf>,  // NEW: WAL directory for compaction
) {
    // ...
}

4. Integrate into shutdown

Modify tidal/src/db/mod.rs shutdown_inner(). The existing shutdown already does:

  1. Checkpoint ledger to storage
  2. Write WAL checkpoint marker
  3. Truncate segments before checkpoint

Replace step 3 with the new compaction function:

// In shutdown_inner, replace the truncate_before call:
if let Err(e) = crate::wal::compaction::compact_wal(&self.wal_dir(), seq) {
    tracing::error!(error = %e, "WAL compaction failed during shutdown");
}

The existing wal.truncate_before(seq) in shutdown_inner delegates to the writer thread's TruncateBefore command. The new compact_wal operates directly on the filesystem. Since we call compact_wal AFTER wal.checkpoint(seq) and the WAL writer is being shut down, there is no race. Both approaches delete the same segments; the new one adds directory fsync and metrics.

5. WAL directory accessor

Add a helper to TidalDb to derive the WAL directory path:

impl TidalDb {
    /// Return the WAL directory path for this database.
    ///
    /// Returns `None` in ephemeral mode (no WAL).
    fn wal_dir(&self) -> Option<std::path::PathBuf> {
        self.config.data_dir.as_ref().map(|d| d.join("wal"))
    }
}

Acceptance Criteria

  • compact_wal(dir, seq) deletes all segments with first_seq < seq
  • Segments with first_seq >= seq are preserved
  • Directory is fsynced after deletion to ensure durability of unlinks
  • CompactionResult reports segments_deleted, bytes_reclaimed, segments_remaining
  • Periodic checkpoint thread calls compact_wal after each successful checkpoint
  • Shutdown calls compact_wal after writing the WAL checkpoint marker
  • Compaction failure is non-fatal (logged as warning, does not abort shutdown or checkpoint)
  • Crash during compaction is safe: reopen replays any surviving segments correctly
  • cargo test --manifest-path tidal/Cargo.toml passes with compaction integrated
  • Unit tests: compact_empty_dir, compact_deletes_old_segments, compact_preserves_current, compact_no_segments_to_delete, compact_all_segments_old, compact_crash_during_deletion_is_safe

Test Strategy

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
    use super::*;
    use crate::wal::segment::{SegmentWriter, list_segments, segment_filename};
    use std::fs;

    #[test]
    fn compact_empty_dir() {
        let dir = tempfile::tempdir().unwrap();
        let result = compact_wal(dir.path(), 100).unwrap();
        assert_eq!(result.segments_deleted, 0);
        assert_eq!(result.segments_remaining, 0);
    }

    #[test]
    fn compact_deletes_old_segments() {
        let dir = tempfile::tempdir().unwrap();
        // Create segments at seq 1, 50, 100, 200.
        for &seq in &[1u64, 50, 100, 200] {
            let _ = SegmentWriter::open(dir.path(), seq, 1024).unwrap();
        }
        assert_eq!(list_segments(dir.path()).unwrap().len(), 4);

        // Compact with checkpoint at seq=100.
        // Segments 1 and 50 have first_seq < 100, so they are deleted.
        // Segments 100 and 200 are preserved.
        let result = compact_wal(dir.path(), 100).unwrap();
        assert_eq!(result.segments_deleted, 2);
        assert_eq!(result.segments_remaining, 2);

        let remaining = list_segments(dir.path()).unwrap();
        assert_eq!(remaining[0].0, 100);
        assert_eq!(remaining[1].0, 200);
    }

    #[test]
    fn compact_preserves_current_segment() {
        let dir = tempfile::tempdir().unwrap();
        let _ = SegmentWriter::open(dir.path(), 100, 1024).unwrap();

        // Checkpoint at seq=100: segment starting at 100 is NOT deleted
        // (it may contain events >= 100).
        let result = compact_wal(dir.path(), 100).unwrap();
        assert_eq!(result.segments_deleted, 0);
        assert_eq!(result.segments_remaining, 1);
    }

    #[test]
    fn compact_no_segments_to_delete() {
        let dir = tempfile::tempdir().unwrap();
        let _ = SegmentWriter::open(dir.path(), 500, 1024).unwrap();

        let result = compact_wal(dir.path(), 100).unwrap();
        assert_eq!(result.segments_deleted, 0);
        assert_eq!(result.segments_remaining, 1);
    }

    #[test]
    fn compact_all_segments_old() {
        let dir = tempfile::tempdir().unwrap();
        for &seq in &[1u64, 10, 20] {
            let _ = SegmentWriter::open(dir.path(), seq, 1024).unwrap();
        }

        let result = compact_wal(dir.path(), 1000).unwrap();
        assert_eq!(result.segments_deleted, 3);
        assert_eq!(result.segments_remaining, 0);
    }

    #[test]
    fn compact_idempotent() {
        let dir = tempfile::tempdir().unwrap();
        let _ = SegmentWriter::open(dir.path(), 1, 1024).unwrap();
        let _ = SegmentWriter::open(dir.path(), 100, 1024).unwrap();

        compact_wal(dir.path(), 100).unwrap();
        // Running compaction again should be a no-op.
        let result = compact_wal(dir.path(), 100).unwrap();
        assert_eq!(result.segments_deleted, 0);
        assert_eq!(result.segments_remaining, 1);
    }
}