rdev/internal/worker/operation_cleanup.go
jordan 53862c773b fix: resolve systemic debt in worker and skeleton templates
Worker template fixes:
- Replace panic() with logger.Error() + os.Exit(1) for config errors
- Remove double-timeout application (context + middleware)
- Add error message truncation to prevent log bloat
- Use named constants for shutdown grace period and stale check interval

Skeleton pkg/auth fixes:
- Fix error wrapping to use %w consistently in jwt.go
- Add GetUserOrError() as safe alternative to MustGetUser() panic

Skeleton pkg/queue fixes:
- Check RowsAffected() errors instead of ignoring them
- Add input validation to EnqueueWithOptions (require job type, cap retries)
- Add log truncation for error messages
- Fix inaccurate doc comment claiming exponential backoff

Worker timeout consolidation:
- Add internal/worker/timeouts.go with named constants
- Migrate all workers to use timeout constants

Cleanup:
- Remove obsolete slack-preparation-thoughts.md files

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 23:44:55 -07:00

125 lines
2.9 KiB
Go

package worker
import (
"context"
"sync"
"time"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port"
)
// OperationCleanup runs periodic cleanup of old operations.
// Operations older than the retention period (default 30 days) are deleted.
type OperationCleanup struct {
repo port.OperationRepository
retentionPeriod time.Duration
cleanupInterval time.Duration
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// OperationCleanupConfig holds configuration for operation cleanup.
type OperationCleanupConfig struct {
// RetentionPeriod is how long to keep operations.
// Default: 30 days.
RetentionPeriod time.Duration
// CleanupInterval is how often to run cleanup.
// Default: 1 hour.
CleanupInterval time.Duration
}
// DefaultOperationCleanupConfig returns sensible defaults.
func DefaultOperationCleanupConfig() *OperationCleanupConfig {
return &OperationCleanupConfig{
RetentionPeriod: 30 * 24 * time.Hour, // 30 days
CleanupInterval: 1 * time.Hour,
}
}
// NewOperationCleanup creates a new operation cleanup worker.
func NewOperationCleanup(repo port.OperationRepository, cfg *OperationCleanupConfig) *OperationCleanup {
if cfg == nil {
cfg = DefaultOperationCleanupConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &OperationCleanup{
repo: repo,
retentionPeriod: cfg.RetentionPeriod,
cleanupInterval: cfg.CleanupInterval,
ctx: ctx,
cancel: cancel,
}
}
// Start begins the cleanup loop.
func (c *OperationCleanup) Start() {
log := logging.FromContext(c.ctx).WithWorker("operation-cleanup")
log.Info("operation cleanup started",
"retention_period", c.retentionPeriod,
"cleanup_interval", c.cleanupInterval,
)
c.wg.Add(1)
go c.cleanupLoop()
}
// Stop gracefully shuts down the cleanup worker.
func (c *OperationCleanup) Stop() {
log := logging.FromContext(c.ctx).WithWorker("operation-cleanup")
log.Info("operation cleanup stopping")
c.cancel()
c.wg.Wait()
log.Info("operation cleanup stopped")
}
// cleanupLoop runs periodic cleanup.
func (c *OperationCleanup) cleanupLoop() {
defer c.wg.Done()
// Run immediately on start
c.runCleanup()
ticker := time.NewTicker(c.cleanupInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
c.runCleanup()
}
}
}
// runCleanup deletes operations older than the retention period.
func (c *OperationCleanup) runCleanup() {
ctx, cancel := context.WithTimeout(c.ctx, TimeoutMaintenance)
defer cancel()
log := logging.FromContext(ctx).WithWorker("operation-cleanup")
cutoff := time.Now().Add(-c.retentionPeriod)
deleted, err := c.repo.DeleteOlderThan(ctx, cutoff)
if err != nil {
log.Error("failed to cleanup old operations",
logging.FieldError, err,
"cutoff", cutoff,
)
return
}
if deleted > 0 {
log.Info("cleaned up old operations",
"deleted", deleted,
"cutoff", cutoff,
)
}
}