rdev/internal/worker/work_executor.go
jordan bc47e426b0 feat: Add CI pipeline proxy, DNS alias management, and worker executor system
- Add ListPipelines/GetPipeline to CIProvider port with Woodpecker adapter
- Add DNS alias endpoints: GET/POST/DELETE /projects/{id}/domains
- Implement worker executor daemon, build executor, and git operations
- Add build service, worker service, and build audit tracking
- Add worker registry with PostgreSQL adapter and migration
- Add multi-provider code agent interface (Claude Code + OpenCode)
- Add create-and-build combo endpoint
- Update landing-page cookbook to reflect all gaps closed
- Fix tech debt: unified validation, auth scopes, error wrapping, slog patterns

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-27 21:05:28 -07:00

290 lines
6.9 KiB
Go

// Package worker provides background workers for async task processing.
package worker
import (
"context"
"fmt"
"log/slog"
"os"
"sync"
"sync/atomic"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/service"
)
// WorkExecutor is a background daemon that polls the work queue for tasks
// and executes them via task-type-specific handlers. It self-registers as
// a worker, sends heartbeats, and reports results.
type WorkExecutor struct {
workerSvc *service.WorkerService
workSvc *service.WorkService
buildExec *BuildExecutor
logger *slog.Logger
workerID string
hostname string
version string
capabilities []string
pollPeriod time.Duration
hbPeriod time.Duration
taskTimeout time.Duration
started int32 // atomic flag to prevent double-start
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// WorkExecutorConfig holds configuration for the work executor.
type WorkExecutorConfig struct {
// WorkerID uniquely identifies this executor instance.
// Defaults to HOSTNAME env var or "rdev-worker-0".
WorkerID string
// Version reported to the worker registry.
Version string
// Capabilities reported to the worker registry.
Capabilities []string
// PollPeriod is how often to check for new tasks.
PollPeriod time.Duration
// HeartbeatPeriod is how often to send heartbeats.
HeartbeatPeriod time.Duration
// TaskTimeout is the maximum time a single task may run.
// Default: 15 minutes.
TaskTimeout time.Duration
Logger *slog.Logger
}
// DefaultWorkExecutorConfig returns sensible defaults.
func DefaultWorkExecutorConfig() *WorkExecutorConfig {
workerID := os.Getenv("HOSTNAME")
if workerID == "" {
workerID = "rdev-worker-0"
}
return &WorkExecutorConfig{
WorkerID: workerID,
Capabilities: []string{"build"},
PollPeriod: 5 * time.Second,
HeartbeatPeriod: 30 * time.Second,
TaskTimeout: 15 * time.Minute,
Logger: slog.Default(),
}
}
// NewWorkExecutor creates a new work executor daemon.
func NewWorkExecutor(
workerSvc *service.WorkerService,
workSvc *service.WorkService,
buildExec *BuildExecutor,
cfg *WorkExecutorConfig,
) *WorkExecutor {
if cfg == nil {
cfg = DefaultWorkExecutorConfig()
}
hostname, _ := os.Hostname()
if hostname == "" {
hostname = cfg.WorkerID
}
ctx, cancel := context.WithCancel(context.Background())
capabilities := cfg.Capabilities
if len(capabilities) == 0 {
capabilities = []string{"build"}
}
taskTimeout := cfg.TaskTimeout
if taskTimeout == 0 {
taskTimeout = 15 * time.Minute
}
return &WorkExecutor{
workerSvc: workerSvc,
workSvc: workSvc,
buildExec: buildExec,
logger: cfg.Logger.With("component", "work-executor"),
workerID: cfg.WorkerID,
hostname: hostname,
version: cfg.Version,
capabilities: capabilities,
pollPeriod: cfg.PollPeriod,
hbPeriod: cfg.HeartbeatPeriod,
taskTimeout: taskTimeout,
ctx: ctx,
cancel: cancel,
}
}
// Start registers the worker and begins the poll and heartbeat loops.
func (e *WorkExecutor) Start() error {
if !atomic.CompareAndSwapInt32(&e.started, 0, 1) {
return fmt.Errorf("executor already started")
}
// Register this worker in the pool
worker := &domain.Worker{
ID: e.workerID,
Hostname: e.hostname,
Capabilities: e.capabilities,
Version: e.version,
}
if err := e.workerSvc.Register(e.ctx, worker); err != nil {
return err
}
e.logger.Info("work executor started",
"worker_id", e.workerID,
"poll_period", e.pollPeriod,
"heartbeat_period", e.hbPeriod,
)
// Start heartbeat loop
e.wg.Add(1)
go e.heartbeatLoop()
// Start poll loop
e.wg.Add(1)
go e.pollLoop()
return nil
}
// Stop gracefully shuts down the executor.
func (e *WorkExecutor) Stop() {
e.logger.Info("work executor stopping", "worker_id", e.workerID)
e.cancel()
e.wg.Wait()
// Deregister (best-effort, context is cancelled so use a fresh one)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := e.workerSvc.Deregister(ctx, e.workerID); err != nil {
e.logger.Warn("failed to deregister worker", "error", err)
}
e.logger.Info("work executor stopped", "worker_id", e.workerID)
}
// WorkerID returns the executor's worker ID.
func (e *WorkExecutor) WorkerID() string {
return e.workerID
}
// Running returns true if the executor context has not been cancelled.
func (e *WorkExecutor) Running() bool {
return e.ctx.Err() == nil
}
// heartbeatLoop sends periodic heartbeats to the worker registry.
func (e *WorkExecutor) heartbeatLoop() {
defer e.wg.Done()
ticker := time.NewTicker(e.hbPeriod)
defer ticker.Stop()
for {
select {
case <-e.ctx.Done():
return
case <-ticker.C:
if err := e.workerSvc.Heartbeat(e.ctx, e.workerID); err != nil {
e.logger.Warn("heartbeat failed", "error", err)
}
}
}
}
// pollLoop checks for available tasks on a ticker.
func (e *WorkExecutor) pollLoop() {
defer e.wg.Done()
ticker := time.NewTicker(e.pollPeriod)
defer ticker.Stop()
for {
select {
case <-e.ctx.Done():
return
case <-ticker.C:
e.tryClaimAndExecute()
}
}
}
// tryClaimAndExecute attempts to claim a task and execute it.
func (e *WorkExecutor) tryClaimAndExecute() {
task, err := e.workerSvc.ClaimTask(e.ctx, e.workerID)
if err != nil {
e.logger.Warn("failed to claim task", "error", err)
return
}
if task == nil {
return // No tasks available
}
e.logger.Info("executing task",
"task_id", task.ID,
"project_id", task.ProjectID,
"type", task.Type,
)
taskCtx, taskCancel := context.WithTimeout(e.ctx, e.taskTimeout)
defer taskCancel()
result := e.executeTask(taskCtx, task)
// Record build metrics
status := "success"
if !result.Success {
status = "failed"
}
metrics.RecordBuild(task.ProjectID, status, result.DurationMs)
if result.Success {
if err := e.workerSvc.CompleteTask(e.ctx, e.workerID, task.ID, result); err != nil {
e.logger.Error("failed to complete task",
"task_id", task.ID,
"error", err,
)
}
} else {
// Fail the task through work service (handles retry logic)
errMsg := result.Error
if errMsg == "" {
errMsg = "execution failed"
}
if err := e.workSvc.FailTask(e.ctx, task.ID, errMsg); err != nil {
e.logger.Error("failed to record task failure",
"task_id", task.ID,
"error", err,
)
}
// Return worker to idle regardless
if err := e.workerSvc.Heartbeat(e.ctx, e.workerID); err != nil {
e.logger.Warn("failed to heartbeat after failure", "error", err)
}
}
}
// executeTask routes a task to the appropriate handler based on its type.
func (e *WorkExecutor) executeTask(ctx context.Context, task *domain.WorkTask) *domain.BuildResult {
switch task.Type {
case domain.WorkTaskTypeBuild:
return e.buildExec.Execute(ctx, task)
default:
return &domain.BuildResult{
Success: false,
Error: "unsupported task type: " + string(task.Type),
}
}
}