Worker template fixes: - Replace panic() with logger.Error() + os.Exit(1) for config errors - Remove double-timeout application (context + middleware) - Add error message truncation to prevent log bloat - Use named constants for shutdown grace period and stale check interval Skeleton pkg/auth fixes: - Fix error wrapping to use %w consistently in jwt.go - Add GetUserOrError() as safe alternative to MustGetUser() panic Skeleton pkg/queue fixes: - Check RowsAffected() errors instead of ignoring them - Add input validation to EnqueueWithOptions (require job type, cap retries) - Add log truncation for error messages - Fix inaccurate doc comment claiming exponential backoff Worker timeout consolidation: - Add internal/worker/timeouts.go with named constants - Migrate all workers to use timeout constants Cleanup: - Remove obsolete slack-preparation-thoughts.md files Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
368 lines
9.9 KiB
Go
368 lines
9.9 KiB
Go
// Package worker provides background workers for async task processing.
|
|
package worker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/logging"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// QueueProcessor processes queued commands in the background.
|
|
type QueueProcessor struct {
|
|
queue port.CommandQueue
|
|
executor port.CommandExecutor
|
|
projects port.ProjectRepository
|
|
streams port.StreamPublisher
|
|
webhookDispatcher port.WebhookDispatcher
|
|
pollPeriod time.Duration
|
|
|
|
// Shutdown management
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
|
|
// Track active project workers
|
|
projectWorkers map[string]context.CancelFunc
|
|
projectMu sync.Mutex
|
|
}
|
|
|
|
// QueueProcessorConfig holds configuration for the queue processor.
|
|
type QueueProcessorConfig struct {
|
|
PollPeriod time.Duration
|
|
}
|
|
|
|
// DefaultQueueProcessorConfig returns sensible defaults.
|
|
func DefaultQueueProcessorConfig() *QueueProcessorConfig {
|
|
return &QueueProcessorConfig{
|
|
PollPeriod: 5 * time.Second,
|
|
}
|
|
}
|
|
|
|
// NewQueueProcessor creates a new queue processor.
|
|
func NewQueueProcessor(
|
|
queue port.CommandQueue,
|
|
executor port.CommandExecutor,
|
|
projects port.ProjectRepository,
|
|
streams port.StreamPublisher,
|
|
cfg *QueueProcessorConfig,
|
|
) *QueueProcessor {
|
|
if cfg == nil {
|
|
cfg = DefaultQueueProcessorConfig()
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
return &QueueProcessor{
|
|
queue: queue,
|
|
executor: executor,
|
|
projects: projects,
|
|
streams: streams,
|
|
pollPeriod: cfg.PollPeriod,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
projectWorkers: make(map[string]context.CancelFunc),
|
|
}
|
|
}
|
|
|
|
// WithWebhookDispatcher sets a webhook dispatcher for event notifications.
|
|
func (p *QueueProcessor) WithWebhookDispatcher(dispatcher port.WebhookDispatcher) *QueueProcessor {
|
|
p.webhookDispatcher = dispatcher
|
|
return p
|
|
}
|
|
|
|
// Start begins processing the command queue.
|
|
// It spawns a worker for each known project.
|
|
func (p *QueueProcessor) Start() error {
|
|
log := logging.FromContext(p.ctx).WithWorker("queue-processor")
|
|
log.Info("queue processor starting")
|
|
|
|
// Start the main coordinator that manages per-project workers
|
|
p.wg.Add(1)
|
|
go p.coordinator()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully shuts down the queue processor.
|
|
func (p *QueueProcessor) Stop() {
|
|
log := logging.FromContext(p.ctx).WithWorker("queue-processor")
|
|
log.Info("queue processor stopping")
|
|
p.cancel()
|
|
p.wg.Wait()
|
|
log.Info("queue processor stopped")
|
|
}
|
|
|
|
// coordinator manages per-project workers, starting new ones as projects are discovered.
|
|
func (p *QueueProcessor) coordinator() {
|
|
defer p.wg.Done()
|
|
|
|
log := logging.FromContext(p.ctx).WithWorker("queue-processor")
|
|
ticker := time.NewTicker(p.pollPeriod)
|
|
defer ticker.Stop()
|
|
|
|
// Do an initial check
|
|
p.refreshProjectWorkers()
|
|
|
|
for {
|
|
select {
|
|
case <-p.ctx.Done():
|
|
// Stop all project workers
|
|
p.projectMu.Lock()
|
|
for projectID, cancel := range p.projectWorkers {
|
|
log.Debug("stopping worker", "project", projectID)
|
|
cancel()
|
|
}
|
|
p.projectMu.Unlock()
|
|
return
|
|
case <-ticker.C:
|
|
p.refreshProjectWorkers()
|
|
}
|
|
}
|
|
}
|
|
|
|
// refreshProjectWorkers ensures each known project has a worker.
|
|
func (p *QueueProcessor) refreshProjectWorkers() {
|
|
log := logging.FromContext(p.ctx).WithWorker("queue-processor")
|
|
|
|
projects, err := p.projects.List(p.ctx)
|
|
if err != nil {
|
|
log.Warn("failed to list projects for queue processing", logging.FieldError, err)
|
|
return
|
|
}
|
|
|
|
p.projectMu.Lock()
|
|
defer p.projectMu.Unlock()
|
|
|
|
// Start workers for new projects
|
|
for _, project := range projects {
|
|
projectID := string(project.ID)
|
|
if _, exists := p.projectWorkers[projectID]; !exists {
|
|
workerCtx, workerCancel := context.WithCancel(p.ctx)
|
|
p.projectWorkers[projectID] = workerCancel
|
|
p.wg.Add(1)
|
|
go p.projectWorker(workerCtx, projectID)
|
|
log.Info("started queue worker", "project", projectID)
|
|
}
|
|
}
|
|
|
|
// Note: We don't remove workers for deleted projects to handle in-flight commands.
|
|
// They will naturally stop when their context is cancelled on shutdown.
|
|
}
|
|
|
|
// projectWorker processes commands for a single project.
|
|
func (p *QueueProcessor) projectWorker(ctx context.Context, projectID string) {
|
|
defer p.wg.Done()
|
|
|
|
ticker := time.NewTicker(p.pollPeriod)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
// Try to dequeue and process a command
|
|
if err := p.processNextCommand(ctx, projectID); err != nil {
|
|
logging.FromContext(ctx).WithWorker("queue-processor").Warn("error processing command", "project", projectID, "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processNextCommand dequeues and executes the next command for a project.
|
|
func (p *QueueProcessor) processNextCommand(ctx context.Context, projectID string) error {
|
|
// Try to dequeue a command
|
|
cmd, err := p.queue.Dequeue(ctx, projectID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if cmd == nil {
|
|
return nil // No commands pending
|
|
}
|
|
|
|
logging.FromContext(ctx).WithWorker("queue-processor").Info("processing queued command",
|
|
"command_id", cmd.ID,
|
|
"project", projectID,
|
|
"type", cmd.CommandType,
|
|
)
|
|
|
|
// Get the project to find the pod name
|
|
project, err := p.projects.Get(ctx, domain.ProjectID(projectID))
|
|
if err != nil {
|
|
// Update command as failed
|
|
result := &domain.QueuedCommandResult{
|
|
ExitCode: 1,
|
|
Error: "project not found: " + err.Error(),
|
|
}
|
|
_ = p.queue.UpdateStatus(ctx, cmd.ID, domain.QueueStatusFailed, result)
|
|
|
|
// Dispatch command.failed webhook
|
|
p.dispatchWebhookEvent(ctx, projectID, domain.WebhookEventCommandFailed, &domain.CommandEventData{
|
|
CommandID: string(cmd.ID),
|
|
CommandType: cmd.CommandType,
|
|
ProjectID: projectID,
|
|
Error: "project not found: " + err.Error(),
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// Create a domain.Command for the executor
|
|
execCmd := &domain.Command{
|
|
ID: domain.CommandID(cmd.ID),
|
|
ProjectID: domain.ProjectID(projectID),
|
|
Type: cmd.CommandType,
|
|
StartedAt: time.Now(),
|
|
}
|
|
|
|
// Parse args based on command type
|
|
switch cmd.CommandType {
|
|
case domain.CommandTypeClaude:
|
|
execCmd.Args = []string{cmd.Command}
|
|
case domain.CommandTypeShell:
|
|
execCmd.Args = []string{cmd.Command}
|
|
case domain.CommandTypeGit:
|
|
// Git args are JSON-encoded
|
|
var gitArgs []string
|
|
if err := json.Unmarshal([]byte(cmd.Command), &gitArgs); err != nil {
|
|
// Fallback: treat as single arg
|
|
gitArgs = []string{cmd.Command}
|
|
}
|
|
execCmd.Args = gitArgs
|
|
}
|
|
|
|
// Stream ID for real-time output
|
|
streamID := string(cmd.ID)
|
|
|
|
// Dispatch command.started webhook
|
|
p.dispatchWebhookEvent(ctx, projectID, domain.WebhookEventCommandStarted, &domain.CommandEventData{
|
|
CommandID: string(cmd.ID),
|
|
CommandType: cmd.CommandType,
|
|
ProjectID: projectID,
|
|
StartedAt: execCmd.StartedAt,
|
|
})
|
|
|
|
// Collect output
|
|
var outputBuilder strings.Builder
|
|
var outputMu sync.Mutex
|
|
|
|
// Execute the command
|
|
execCtx, execCancel := context.WithTimeout(ctx, TimeoutWorkExecution)
|
|
defer execCancel()
|
|
|
|
execResult, execErr := p.executor.Execute(execCtx, execCmd, project.PodName, func(line domain.OutputLine) {
|
|
// Publish to stream for real-time subscribers
|
|
p.streams.Publish(streamID, port.StreamEvent{
|
|
Type: "output",
|
|
Data: map[string]any{
|
|
"line": line.Line,
|
|
"stream": line.Stream,
|
|
},
|
|
})
|
|
|
|
// Collect output
|
|
outputMu.Lock()
|
|
if outputBuilder.Len() > 0 {
|
|
outputBuilder.WriteString("\n")
|
|
}
|
|
outputBuilder.WriteString(line.Line)
|
|
outputMu.Unlock()
|
|
})
|
|
|
|
// Determine final status and result
|
|
var finalStatus domain.QueueStatus
|
|
queueResult := &domain.QueuedCommandResult{
|
|
Output: outputBuilder.String(),
|
|
}
|
|
|
|
if execErr != nil {
|
|
finalStatus = domain.QueueStatusFailed
|
|
queueResult.ExitCode = 1
|
|
queueResult.Error = execErr.Error()
|
|
} else if execResult.ExitCode != 0 {
|
|
finalStatus = domain.QueueStatusFailed
|
|
queueResult.ExitCode = execResult.ExitCode
|
|
} else {
|
|
finalStatus = domain.QueueStatusCompleted
|
|
queueResult.ExitCode = 0
|
|
}
|
|
|
|
// Update command status
|
|
if err := p.queue.UpdateStatus(ctx, cmd.ID, finalStatus, queueResult); err != nil {
|
|
logging.FromContext(ctx).WithWorker("queue-processor").Warn("failed to update command status", "command_id", cmd.ID, "error", err)
|
|
}
|
|
|
|
// Publish completion event
|
|
p.streams.Publish(streamID, port.StreamEvent{
|
|
Type: "complete",
|
|
Data: map[string]any{
|
|
"exit_code": queueResult.ExitCode,
|
|
"duration_ms": execResult.DurationMs,
|
|
"status": string(finalStatus),
|
|
},
|
|
})
|
|
|
|
// Dispatch command.completed or command.failed webhook
|
|
completedAt := time.Now()
|
|
var webhookEventType domain.WebhookEventType
|
|
if finalStatus == domain.QueueStatusCompleted {
|
|
webhookEventType = domain.WebhookEventCommandCompleted
|
|
} else {
|
|
webhookEventType = domain.WebhookEventCommandFailed
|
|
}
|
|
|
|
p.dispatchWebhookEvent(ctx, projectID, webhookEventType, &domain.CommandEventData{
|
|
CommandID: string(cmd.ID),
|
|
CommandType: cmd.CommandType,
|
|
ProjectID: projectID,
|
|
StartedAt: execCmd.StartedAt,
|
|
CompletedAt: completedAt,
|
|
ExitCode: queueResult.ExitCode,
|
|
DurationMs: execResult.DurationMs,
|
|
Error: queueResult.Error,
|
|
})
|
|
|
|
logging.FromContext(ctx).WithWorker("queue-processor").Info("completed queued command",
|
|
"command_id", cmd.ID,
|
|
"project", projectID,
|
|
"status", finalStatus,
|
|
"exit_code", queueResult.ExitCode,
|
|
)
|
|
|
|
// Clean up stream after delay
|
|
go func() {
|
|
time.Sleep(30 * time.Second)
|
|
p.streams.Close(streamID)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// dispatchWebhookEvent dispatches a webhook event if a dispatcher is configured.
|
|
func (p *QueueProcessor) dispatchWebhookEvent(ctx context.Context, projectID string, eventType domain.WebhookEventType, data any) {
|
|
if p.webhookDispatcher == nil {
|
|
return
|
|
}
|
|
|
|
event := &domain.WebhookEvent{
|
|
Type: eventType,
|
|
Timestamp: time.Now(),
|
|
ProjectID: projectID,
|
|
Data: data,
|
|
}
|
|
|
|
if err := p.webhookDispatcher.Dispatch(ctx, projectID, event); err != nil {
|
|
logging.FromContext(ctx).WithWorker("queue-processor").Warn("failed to dispatch webhook event",
|
|
"project_id", projectID,
|
|
"event_type", eventType,
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|