All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Add TimeoutAgentExecution (22m) to handlers for synchronous SDLC
execution, and TimeoutAgent{Default,Medium,Heavy} (12/22/47m) to
workers for tiered agent task execution. Aligns with SDLC action
complexity tiers and prevents inline duration literals.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
319 lines
7.9 KiB
Go
319 lines
7.9 KiB
Go
// Package worker provides background workers for async task processing.
|
|
package worker
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/logging"
|
|
"github.com/orchard9/rdev/internal/metrics"
|
|
"github.com/orchard9/rdev/internal/service"
|
|
)
|
|
|
|
// WorkExecutor is a background daemon that polls the work queue for tasks
|
|
// and executes them via task-type-specific handlers. It self-registers as
|
|
// a worker, sends heartbeats, and reports results.
|
|
type WorkExecutor struct {
|
|
workerSvc *service.WorkerService
|
|
workSvc *service.WorkService
|
|
buildExec *BuildExecutor
|
|
verifyExec *VerifyExecutor
|
|
sdlcExec *SDLCTaskExecutor
|
|
|
|
workerID string
|
|
hostname string
|
|
version string
|
|
capabilities []string
|
|
pollPeriod time.Duration
|
|
hbPeriod time.Duration
|
|
taskTimeout time.Duration
|
|
|
|
started int32 // atomic flag to prevent double-start
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// WorkExecutorConfig holds configuration for the work executor.
|
|
type WorkExecutorConfig struct {
|
|
// WorkerID uniquely identifies this executor instance.
|
|
// Defaults to HOSTNAME env var or "rdev-worker-0".
|
|
WorkerID string
|
|
|
|
// Version reported to the worker registry.
|
|
Version string
|
|
|
|
// Capabilities reported to the worker registry.
|
|
Capabilities []string
|
|
|
|
// PollPeriod is how often to check for new tasks.
|
|
PollPeriod time.Duration
|
|
|
|
// HeartbeatPeriod is how often to send heartbeats.
|
|
HeartbeatPeriod time.Duration
|
|
|
|
// TaskTimeout is the maximum time a single task may run.
|
|
// Default: 15 minutes.
|
|
TaskTimeout time.Duration
|
|
}
|
|
|
|
// DefaultWorkExecutorConfig returns sensible defaults.
|
|
func DefaultWorkExecutorConfig() *WorkExecutorConfig {
|
|
workerID := os.Getenv("HOSTNAME")
|
|
if workerID == "" {
|
|
workerID = "rdev-worker-0"
|
|
}
|
|
return &WorkExecutorConfig{
|
|
WorkerID: workerID,
|
|
Capabilities: []string{"build"},
|
|
PollPeriod: 5 * time.Second,
|
|
HeartbeatPeriod: 30 * time.Second,
|
|
TaskTimeout: 50 * time.Minute,
|
|
}
|
|
}
|
|
|
|
// NewWorkExecutor creates a new work executor daemon.
|
|
func NewWorkExecutor(
|
|
workerSvc *service.WorkerService,
|
|
workSvc *service.WorkService,
|
|
buildExec *BuildExecutor,
|
|
verifyExec *VerifyExecutor,
|
|
sdlcExec *SDLCTaskExecutor,
|
|
cfg *WorkExecutorConfig,
|
|
) *WorkExecutor {
|
|
if cfg == nil {
|
|
cfg = DefaultWorkExecutorConfig()
|
|
}
|
|
|
|
hostname, _ := os.Hostname()
|
|
if hostname == "" {
|
|
hostname = cfg.WorkerID
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
capabilities := cfg.Capabilities
|
|
if len(capabilities) == 0 {
|
|
capabilities = []string{"build"}
|
|
}
|
|
|
|
taskTimeout := cfg.TaskTimeout
|
|
if taskTimeout == 0 {
|
|
taskTimeout = 15 * time.Minute
|
|
}
|
|
|
|
return &WorkExecutor{
|
|
workerSvc: workerSvc,
|
|
workSvc: workSvc,
|
|
buildExec: buildExec,
|
|
verifyExec: verifyExec,
|
|
sdlcExec: sdlcExec,
|
|
workerID: cfg.WorkerID,
|
|
hostname: hostname,
|
|
version: cfg.Version,
|
|
capabilities: capabilities,
|
|
pollPeriod: cfg.PollPeriod,
|
|
hbPeriod: cfg.HeartbeatPeriod,
|
|
taskTimeout: taskTimeout,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
}
|
|
|
|
// Start registers the worker and begins the poll and heartbeat loops.
|
|
func (e *WorkExecutor) Start() error {
|
|
if !atomic.CompareAndSwapInt32(&e.started, 0, 1) {
|
|
return fmt.Errorf("executor already started")
|
|
}
|
|
|
|
log := logging.FromContext(e.ctx).WithWorker("work-executor")
|
|
|
|
// Register this worker in the pool
|
|
worker := &domain.Worker{
|
|
ID: e.workerID,
|
|
Hostname: e.hostname,
|
|
Capabilities: e.capabilities,
|
|
Version: e.version,
|
|
}
|
|
if err := e.workerSvc.Register(e.ctx, worker); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("work executor started",
|
|
"worker_id", e.workerID,
|
|
"poll_period", e.pollPeriod,
|
|
"heartbeat_period", e.hbPeriod,
|
|
)
|
|
|
|
// Start heartbeat loop
|
|
e.wg.Add(1)
|
|
go e.heartbeatLoop()
|
|
|
|
// Start poll loop
|
|
e.wg.Add(1)
|
|
go e.pollLoop()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully shuts down the executor.
|
|
func (e *WorkExecutor) Stop() {
|
|
log := logging.FromContext(e.ctx).WithWorker("work-executor")
|
|
log.Info("work executor stopping", "worker_id", e.workerID)
|
|
e.cancel()
|
|
e.wg.Wait()
|
|
|
|
// Deregister (best-effort, context is cancelled so use a fresh one)
|
|
ctx, cancel := context.WithTimeout(context.Background(), TimeoutQuickOp)
|
|
defer cancel()
|
|
if err := e.workerSvc.Deregister(ctx, e.workerID); err != nil {
|
|
logging.FromContext(ctx).WithWorker("work-executor").Warn("failed to deregister worker", "error", err)
|
|
}
|
|
|
|
log.Info("work executor stopped", "worker_id", e.workerID)
|
|
}
|
|
|
|
// WorkerID returns the executor's worker ID.
|
|
func (e *WorkExecutor) WorkerID() string {
|
|
return e.workerID
|
|
}
|
|
|
|
// Running returns true if the executor context has not been cancelled.
|
|
func (e *WorkExecutor) Running() bool {
|
|
return e.ctx.Err() == nil
|
|
}
|
|
|
|
// heartbeatLoop sends periodic heartbeats to the worker registry.
|
|
func (e *WorkExecutor) heartbeatLoop() {
|
|
defer e.wg.Done()
|
|
|
|
log := logging.FromContext(e.ctx).WithWorker("work-executor")
|
|
ticker := time.NewTicker(e.hbPeriod)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if err := e.workerSvc.Heartbeat(e.ctx, e.workerID); err != nil {
|
|
log.Warn("heartbeat failed", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// pollLoop checks for available tasks on a ticker.
|
|
func (e *WorkExecutor) pollLoop() {
|
|
defer e.wg.Done()
|
|
|
|
ticker := time.NewTicker(e.pollPeriod)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
e.tryClaimAndExecute()
|
|
}
|
|
}
|
|
}
|
|
|
|
// tryClaimAndExecute attempts to claim a task and execute it.
|
|
func (e *WorkExecutor) tryClaimAndExecute() {
|
|
log := logging.FromContext(e.ctx).WithWorker("work-executor")
|
|
|
|
task, err := e.workerSvc.ClaimTask(e.ctx, e.workerID)
|
|
if err != nil {
|
|
log.Warn("failed to claim task", "error", err)
|
|
return
|
|
}
|
|
if task == nil {
|
|
return // No tasks available
|
|
}
|
|
|
|
log.Info("executing task",
|
|
"task_id", task.ID,
|
|
"project_id", task.ProjectID,
|
|
"type", task.Type,
|
|
)
|
|
|
|
taskCtx, taskCancel := context.WithTimeout(e.ctx, e.taskTimeoutFor(task))
|
|
defer taskCancel()
|
|
|
|
result := e.executeTask(taskCtx, task)
|
|
|
|
// Record build metrics
|
|
status := "success"
|
|
if !result.Success {
|
|
status = "failed"
|
|
}
|
|
metrics.RecordBuild(task.ProjectID, status, result.DurationMs)
|
|
|
|
if result.Success {
|
|
if err := e.workerSvc.CompleteTask(e.ctx, e.workerID, task.ID, result); err != nil {
|
|
log.Error("failed to complete task",
|
|
"task_id", task.ID,
|
|
"error", err,
|
|
)
|
|
}
|
|
} else {
|
|
// Fail the task through worker service (updates audit + handles retry logic)
|
|
if result.Error == "" {
|
|
result.Error = "execution failed"
|
|
}
|
|
|
|
if err := e.workerSvc.FailTask(e.ctx, e.workerID, task.ID, result, e.workSvc); err != nil {
|
|
log.Error("failed to record task failure",
|
|
"task_id", task.ID,
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// taskTimeoutFor returns the timeout for a specific task, derived from its spec
|
|
// if available, falling back to the configured default.
|
|
func (e *WorkExecutor) taskTimeoutFor(task *domain.WorkTask) time.Duration {
|
|
if timeoutSec, ok := task.Spec["timeout_seconds"].(float64); ok && timeoutSec > 0 {
|
|
// Add 2 minutes headroom for git clone/push around the agent execution
|
|
return time.Duration(timeoutSec)*time.Second + 2*time.Minute
|
|
}
|
|
return e.taskTimeout
|
|
}
|
|
|
|
// executeTask routes a task to the appropriate handler based on its type.
|
|
func (e *WorkExecutor) executeTask(ctx context.Context, task *domain.WorkTask) *domain.BuildResult {
|
|
switch task.Type {
|
|
case domain.WorkTaskTypeBuild:
|
|
return e.buildExec.Execute(ctx, task)
|
|
case domain.WorkTaskTypeVerify:
|
|
if e.verifyExec == nil {
|
|
return &domain.BuildResult{
|
|
Success: false,
|
|
Error: "verify executor not configured",
|
|
}
|
|
}
|
|
return e.verifyExec.Execute(ctx, task)
|
|
case domain.WorkTaskTypeSDLC:
|
|
if e.sdlcExec == nil {
|
|
return &domain.BuildResult{
|
|
Success: false,
|
|
Error: "sdlc executor not configured",
|
|
}
|
|
}
|
|
return e.sdlcExec.Execute(ctx, task)
|
|
default:
|
|
return &domain.BuildResult{
|
|
Success: false,
|
|
Error: "unsupported task type: " + string(task.Type),
|
|
}
|
|
}
|
|
}
|