// Package worker provides background workers for async task processing. package worker import ( "context" "encoding/json" "log/slog" "strings" "sync" "time" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" ) // QueueProcessor processes queued commands in the background. type QueueProcessor struct { queue port.CommandQueue executor port.CommandExecutor projects port.ProjectRepository streams port.StreamPublisher webhookDispatcher port.WebhookDispatcher logger *slog.Logger pollPeriod time.Duration // Shutdown management ctx context.Context cancel context.CancelFunc wg sync.WaitGroup // Track active project workers projectWorkers map[string]context.CancelFunc projectMu sync.Mutex } // QueueProcessorConfig holds configuration for the queue processor. type QueueProcessorConfig struct { PollPeriod time.Duration Logger *slog.Logger } // DefaultQueueProcessorConfig returns sensible defaults. func DefaultQueueProcessorConfig() *QueueProcessorConfig { return &QueueProcessorConfig{ PollPeriod: 5 * time.Second, Logger: slog.Default(), } } // NewQueueProcessor creates a new queue processor. func NewQueueProcessor( queue port.CommandQueue, executor port.CommandExecutor, projects port.ProjectRepository, streams port.StreamPublisher, cfg *QueueProcessorConfig, ) *QueueProcessor { if cfg == nil { cfg = DefaultQueueProcessorConfig() } ctx, cancel := context.WithCancel(context.Background()) return &QueueProcessor{ queue: queue, executor: executor, projects: projects, streams: streams, logger: cfg.Logger, pollPeriod: cfg.PollPeriod, ctx: ctx, cancel: cancel, projectWorkers: make(map[string]context.CancelFunc), } } // WithWebhookDispatcher sets a webhook dispatcher for event notifications. func (p *QueueProcessor) WithWebhookDispatcher(dispatcher port.WebhookDispatcher) *QueueProcessor { p.webhookDispatcher = dispatcher return p } // Start begins processing the command queue. // It spawns a worker for each known project. func (p *QueueProcessor) Start() error { p.logger.Info("queue processor starting") // Start the main coordinator that manages per-project workers p.wg.Add(1) go p.coordinator() return nil } // Stop gracefully shuts down the queue processor. func (p *QueueProcessor) Stop() { p.logger.Info("queue processor stopping") p.cancel() p.wg.Wait() p.logger.Info("queue processor stopped") } // coordinator manages per-project workers, starting new ones as projects are discovered. func (p *QueueProcessor) coordinator() { defer p.wg.Done() ticker := time.NewTicker(p.pollPeriod) defer ticker.Stop() // Do an initial check p.refreshProjectWorkers() for { select { case <-p.ctx.Done(): // Stop all project workers p.projectMu.Lock() for projectID, cancel := range p.projectWorkers { p.logger.Debug("stopping worker", "project", projectID) cancel() } p.projectMu.Unlock() return case <-ticker.C: p.refreshProjectWorkers() } } } // refreshProjectWorkers ensures each known project has a worker. func (p *QueueProcessor) refreshProjectWorkers() { projects, err := p.projects.List(p.ctx) if err != nil { p.logger.Warn("failed to list projects for queue processing", "error", err) return } p.projectMu.Lock() defer p.projectMu.Unlock() // Start workers for new projects for _, project := range projects { projectID := string(project.ID) if _, exists := p.projectWorkers[projectID]; !exists { workerCtx, workerCancel := context.WithCancel(p.ctx) p.projectWorkers[projectID] = workerCancel p.wg.Add(1) go p.projectWorker(workerCtx, projectID) p.logger.Info("started queue worker", "project", projectID) } } // Note: We don't remove workers for deleted projects to handle in-flight commands. // They will naturally stop when their context is cancelled on shutdown. } // projectWorker processes commands for a single project. func (p *QueueProcessor) projectWorker(ctx context.Context, projectID string) { defer p.wg.Done() ticker := time.NewTicker(p.pollPeriod) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: // Try to dequeue and process a command if err := p.processNextCommand(ctx, projectID); err != nil { p.logger.Warn("error processing command", "project", projectID, "error", err) } } } } // processNextCommand dequeues and executes the next command for a project. func (p *QueueProcessor) processNextCommand(ctx context.Context, projectID string) error { // Try to dequeue a command cmd, err := p.queue.Dequeue(ctx, projectID) if err != nil { return err } if cmd == nil { return nil // No commands pending } p.logger.Info("processing queued command", "command_id", cmd.ID, "project", projectID, "type", cmd.CommandType, ) // Get the project to find the pod name project, err := p.projects.Get(ctx, domain.ProjectID(projectID)) if err != nil { // Update command as failed result := &domain.QueuedCommandResult{ ExitCode: 1, Error: "project not found: " + err.Error(), } _ = p.queue.UpdateStatus(ctx, cmd.ID, domain.QueueStatusFailed, result) // Dispatch command.failed webhook p.dispatchWebhookEvent(ctx, projectID, domain.WebhookEventCommandFailed, &domain.CommandEventData{ CommandID: string(cmd.ID), CommandType: cmd.CommandType, ProjectID: projectID, Error: "project not found: " + err.Error(), }) return err } // Create a domain.Command for the executor execCmd := &domain.Command{ ID: domain.CommandID(cmd.ID), ProjectID: domain.ProjectID(projectID), Type: cmd.CommandType, StartedAt: time.Now(), } // Parse args based on command type switch cmd.CommandType { case domain.CommandTypeClaude: execCmd.Args = []string{cmd.Command} case domain.CommandTypeShell: execCmd.Args = []string{cmd.Command} case domain.CommandTypeGit: // Git args are JSON-encoded var gitArgs []string if err := json.Unmarshal([]byte(cmd.Command), &gitArgs); err != nil { // Fallback: treat as single arg gitArgs = []string{cmd.Command} } execCmd.Args = gitArgs } // Stream ID for real-time output streamID := string(cmd.ID) // Dispatch command.started webhook p.dispatchWebhookEvent(ctx, projectID, domain.WebhookEventCommandStarted, &domain.CommandEventData{ CommandID: string(cmd.ID), CommandType: cmd.CommandType, ProjectID: projectID, StartedAt: execCmd.StartedAt, }) // Collect output var outputBuilder strings.Builder var outputMu sync.Mutex // Execute the command execCtx, execCancel := context.WithTimeout(ctx, 10*time.Minute) defer execCancel() execResult, execErr := p.executor.Execute(execCtx, execCmd, project.PodName, func(line domain.OutputLine) { // Publish to stream for real-time subscribers p.streams.Publish(streamID, port.StreamEvent{ Type: "output", Data: map[string]any{ "line": line.Line, "stream": line.Stream, }, }) // Collect output outputMu.Lock() if outputBuilder.Len() > 0 { outputBuilder.WriteString("\n") } outputBuilder.WriteString(line.Line) outputMu.Unlock() }) // Determine final status and result var finalStatus domain.QueueStatus queueResult := &domain.QueuedCommandResult{ Output: outputBuilder.String(), } if execErr != nil { finalStatus = domain.QueueStatusFailed queueResult.ExitCode = 1 queueResult.Error = execErr.Error() } else if execResult.ExitCode != 0 { finalStatus = domain.QueueStatusFailed queueResult.ExitCode = execResult.ExitCode } else { finalStatus = domain.QueueStatusCompleted queueResult.ExitCode = 0 } // Update command status if err := p.queue.UpdateStatus(ctx, cmd.ID, finalStatus, queueResult); err != nil { p.logger.Warn("failed to update command status", "command_id", cmd.ID, "error", err) } // Publish completion event p.streams.Publish(streamID, port.StreamEvent{ Type: "complete", Data: map[string]any{ "exit_code": queueResult.ExitCode, "duration_ms": execResult.DurationMs, "status": string(finalStatus), }, }) // Dispatch command.completed or command.failed webhook completedAt := time.Now() var webhookEventType domain.WebhookEventType if finalStatus == domain.QueueStatusCompleted { webhookEventType = domain.WebhookEventCommandCompleted } else { webhookEventType = domain.WebhookEventCommandFailed } p.dispatchWebhookEvent(ctx, projectID, webhookEventType, &domain.CommandEventData{ CommandID: string(cmd.ID), CommandType: cmd.CommandType, ProjectID: projectID, StartedAt: execCmd.StartedAt, CompletedAt: completedAt, ExitCode: queueResult.ExitCode, DurationMs: execResult.DurationMs, Error: queueResult.Error, }) p.logger.Info("completed queued command", "command_id", cmd.ID, "project", projectID, "status", finalStatus, "exit_code", queueResult.ExitCode, ) // Clean up stream after delay go func() { time.Sleep(30 * time.Second) p.streams.Close(streamID) }() return nil } // dispatchWebhookEvent dispatches a webhook event if a dispatcher is configured. func (p *QueueProcessor) dispatchWebhookEvent(ctx context.Context, projectID string, eventType domain.WebhookEventType, data any) { if p.webhookDispatcher == nil { return } event := &domain.WebhookEvent{ Type: eventType, Timestamp: time.Now(), ProjectID: projectID, Data: data, } if err := p.webhookDispatcher.Dispatch(ctx, projectID, event); err != nil { p.logger.Warn("failed to dispatch webhook event", "project_id", projectID, "event_type", eventType, "error", err, ) } }