rdev/internal/worker/queue_processor.go
jordan 53862c773b fix: resolve systemic debt in worker and skeleton templates
Worker template fixes:
- Replace panic() with logger.Error() + os.Exit(1) for config errors
- Remove double-timeout application (context + middleware)
- Add error message truncation to prevent log bloat
- Use named constants for shutdown grace period and stale check interval

Skeleton pkg/auth fixes:
- Fix error wrapping to use %w consistently in jwt.go
- Add GetUserOrError() as safe alternative to MustGetUser() panic

Skeleton pkg/queue fixes:
- Check RowsAffected() errors instead of ignoring them
- Add input validation to EnqueueWithOptions (require job type, cap retries)
- Add log truncation for error messages
- Fix inaccurate doc comment claiming exponential backoff

Worker timeout consolidation:
- Add internal/worker/timeouts.go with named constants
- Migrate all workers to use timeout constants

Cleanup:
- Remove obsolete slack-preparation-thoughts.md files

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 23:44:55 -07:00

368 lines
9.9 KiB
Go

// Package worker provides background workers for async task processing.
package worker
import (
"context"
"encoding/json"
"strings"
"sync"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port"
)
// QueueProcessor processes queued commands in the background.
type QueueProcessor struct {
queue port.CommandQueue
executor port.CommandExecutor
projects port.ProjectRepository
streams port.StreamPublisher
webhookDispatcher port.WebhookDispatcher
pollPeriod time.Duration
// Shutdown management
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// Track active project workers
projectWorkers map[string]context.CancelFunc
projectMu sync.Mutex
}
// QueueProcessorConfig holds configuration for the queue processor.
type QueueProcessorConfig struct {
PollPeriod time.Duration
}
// DefaultQueueProcessorConfig returns sensible defaults.
func DefaultQueueProcessorConfig() *QueueProcessorConfig {
return &QueueProcessorConfig{
PollPeriod: 5 * time.Second,
}
}
// NewQueueProcessor creates a new queue processor.
func NewQueueProcessor(
queue port.CommandQueue,
executor port.CommandExecutor,
projects port.ProjectRepository,
streams port.StreamPublisher,
cfg *QueueProcessorConfig,
) *QueueProcessor {
if cfg == nil {
cfg = DefaultQueueProcessorConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &QueueProcessor{
queue: queue,
executor: executor,
projects: projects,
streams: streams,
pollPeriod: cfg.PollPeriod,
ctx: ctx,
cancel: cancel,
projectWorkers: make(map[string]context.CancelFunc),
}
}
// WithWebhookDispatcher sets a webhook dispatcher for event notifications.
func (p *QueueProcessor) WithWebhookDispatcher(dispatcher port.WebhookDispatcher) *QueueProcessor {
p.webhookDispatcher = dispatcher
return p
}
// Start begins processing the command queue.
// It spawns a worker for each known project.
func (p *QueueProcessor) Start() error {
log := logging.FromContext(p.ctx).WithWorker("queue-processor")
log.Info("queue processor starting")
// Start the main coordinator that manages per-project workers
p.wg.Add(1)
go p.coordinator()
return nil
}
// Stop gracefully shuts down the queue processor.
func (p *QueueProcessor) Stop() {
log := logging.FromContext(p.ctx).WithWorker("queue-processor")
log.Info("queue processor stopping")
p.cancel()
p.wg.Wait()
log.Info("queue processor stopped")
}
// coordinator manages per-project workers, starting new ones as projects are discovered.
func (p *QueueProcessor) coordinator() {
defer p.wg.Done()
log := logging.FromContext(p.ctx).WithWorker("queue-processor")
ticker := time.NewTicker(p.pollPeriod)
defer ticker.Stop()
// Do an initial check
p.refreshProjectWorkers()
for {
select {
case <-p.ctx.Done():
// Stop all project workers
p.projectMu.Lock()
for projectID, cancel := range p.projectWorkers {
log.Debug("stopping worker", "project", projectID)
cancel()
}
p.projectMu.Unlock()
return
case <-ticker.C:
p.refreshProjectWorkers()
}
}
}
// refreshProjectWorkers ensures each known project has a worker.
func (p *QueueProcessor) refreshProjectWorkers() {
log := logging.FromContext(p.ctx).WithWorker("queue-processor")
projects, err := p.projects.List(p.ctx)
if err != nil {
log.Warn("failed to list projects for queue processing", logging.FieldError, err)
return
}
p.projectMu.Lock()
defer p.projectMu.Unlock()
// Start workers for new projects
for _, project := range projects {
projectID := string(project.ID)
if _, exists := p.projectWorkers[projectID]; !exists {
workerCtx, workerCancel := context.WithCancel(p.ctx)
p.projectWorkers[projectID] = workerCancel
p.wg.Add(1)
go p.projectWorker(workerCtx, projectID)
log.Info("started queue worker", "project", projectID)
}
}
// Note: We don't remove workers for deleted projects to handle in-flight commands.
// They will naturally stop when their context is cancelled on shutdown.
}
// projectWorker processes commands for a single project.
func (p *QueueProcessor) projectWorker(ctx context.Context, projectID string) {
defer p.wg.Done()
ticker := time.NewTicker(p.pollPeriod)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Try to dequeue and process a command
if err := p.processNextCommand(ctx, projectID); err != nil {
logging.FromContext(ctx).WithWorker("queue-processor").Warn("error processing command", "project", projectID, "error", err)
}
}
}
}
// processNextCommand dequeues and executes the next command for a project.
func (p *QueueProcessor) processNextCommand(ctx context.Context, projectID string) error {
// Try to dequeue a command
cmd, err := p.queue.Dequeue(ctx, projectID)
if err != nil {
return err
}
if cmd == nil {
return nil // No commands pending
}
logging.FromContext(ctx).WithWorker("queue-processor").Info("processing queued command",
"command_id", cmd.ID,
"project", projectID,
"type", cmd.CommandType,
)
// Get the project to find the pod name
project, err := p.projects.Get(ctx, domain.ProjectID(projectID))
if err != nil {
// Update command as failed
result := &domain.QueuedCommandResult{
ExitCode: 1,
Error: "project not found: " + err.Error(),
}
_ = p.queue.UpdateStatus(ctx, cmd.ID, domain.QueueStatusFailed, result)
// Dispatch command.failed webhook
p.dispatchWebhookEvent(ctx, projectID, domain.WebhookEventCommandFailed, &domain.CommandEventData{
CommandID: string(cmd.ID),
CommandType: cmd.CommandType,
ProjectID: projectID,
Error: "project not found: " + err.Error(),
})
return err
}
// Create a domain.Command for the executor
execCmd := &domain.Command{
ID: domain.CommandID(cmd.ID),
ProjectID: domain.ProjectID(projectID),
Type: cmd.CommandType,
StartedAt: time.Now(),
}
// Parse args based on command type
switch cmd.CommandType {
case domain.CommandTypeClaude:
execCmd.Args = []string{cmd.Command}
case domain.CommandTypeShell:
execCmd.Args = []string{cmd.Command}
case domain.CommandTypeGit:
// Git args are JSON-encoded
var gitArgs []string
if err := json.Unmarshal([]byte(cmd.Command), &gitArgs); err != nil {
// Fallback: treat as single arg
gitArgs = []string{cmd.Command}
}
execCmd.Args = gitArgs
}
// Stream ID for real-time output
streamID := string(cmd.ID)
// Dispatch command.started webhook
p.dispatchWebhookEvent(ctx, projectID, domain.WebhookEventCommandStarted, &domain.CommandEventData{
CommandID: string(cmd.ID),
CommandType: cmd.CommandType,
ProjectID: projectID,
StartedAt: execCmd.StartedAt,
})
// Collect output
var outputBuilder strings.Builder
var outputMu sync.Mutex
// Execute the command
execCtx, execCancel := context.WithTimeout(ctx, TimeoutWorkExecution)
defer execCancel()
execResult, execErr := p.executor.Execute(execCtx, execCmd, project.PodName, func(line domain.OutputLine) {
// Publish to stream for real-time subscribers
p.streams.Publish(streamID, port.StreamEvent{
Type: "output",
Data: map[string]any{
"line": line.Line,
"stream": line.Stream,
},
})
// Collect output
outputMu.Lock()
if outputBuilder.Len() > 0 {
outputBuilder.WriteString("\n")
}
outputBuilder.WriteString(line.Line)
outputMu.Unlock()
})
// Determine final status and result
var finalStatus domain.QueueStatus
queueResult := &domain.QueuedCommandResult{
Output: outputBuilder.String(),
}
if execErr != nil {
finalStatus = domain.QueueStatusFailed
queueResult.ExitCode = 1
queueResult.Error = execErr.Error()
} else if execResult.ExitCode != 0 {
finalStatus = domain.QueueStatusFailed
queueResult.ExitCode = execResult.ExitCode
} else {
finalStatus = domain.QueueStatusCompleted
queueResult.ExitCode = 0
}
// Update command status
if err := p.queue.UpdateStatus(ctx, cmd.ID, finalStatus, queueResult); err != nil {
logging.FromContext(ctx).WithWorker("queue-processor").Warn("failed to update command status", "command_id", cmd.ID, "error", err)
}
// Publish completion event
p.streams.Publish(streamID, port.StreamEvent{
Type: "complete",
Data: map[string]any{
"exit_code": queueResult.ExitCode,
"duration_ms": execResult.DurationMs,
"status": string(finalStatus),
},
})
// Dispatch command.completed or command.failed webhook
completedAt := time.Now()
var webhookEventType domain.WebhookEventType
if finalStatus == domain.QueueStatusCompleted {
webhookEventType = domain.WebhookEventCommandCompleted
} else {
webhookEventType = domain.WebhookEventCommandFailed
}
p.dispatchWebhookEvent(ctx, projectID, webhookEventType, &domain.CommandEventData{
CommandID: string(cmd.ID),
CommandType: cmd.CommandType,
ProjectID: projectID,
StartedAt: execCmd.StartedAt,
CompletedAt: completedAt,
ExitCode: queueResult.ExitCode,
DurationMs: execResult.DurationMs,
Error: queueResult.Error,
})
logging.FromContext(ctx).WithWorker("queue-processor").Info("completed queued command",
"command_id", cmd.ID,
"project", projectID,
"status", finalStatus,
"exit_code", queueResult.ExitCode,
)
// Clean up stream after delay
go func() {
time.Sleep(30 * time.Second)
p.streams.Close(streamID)
}()
return nil
}
// dispatchWebhookEvent dispatches a webhook event if a dispatcher is configured.
func (p *QueueProcessor) dispatchWebhookEvent(ctx context.Context, projectID string, eventType domain.WebhookEventType, data any) {
if p.webhookDispatcher == nil {
return
}
event := &domain.WebhookEvent{
Type: eventType,
Timestamp: time.Now(),
ProjectID: projectID,
Data: data,
}
if err := p.webhookDispatcher.Dispatch(ctx, projectID, event); err != nil {
logging.FromContext(ctx).WithWorker("queue-processor").Warn("failed to dispatch webhook event",
"project_id", projectID,
"event_type", eventType,
"error", err,
)
}
}