rdev/cmd/rdev-worker/main.go
jordan d7a6f37593
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
fix: worker graceful shutdown and RWO PVC compatibility
- Add WaitGroup for graceful shutdown of in-flight tasks
- Change replicas to 1 with Recreate strategy (RWO PVC limitation)
- Optimize Dockerfile: combine RUN commands for smaller layers
- Add compiled binaries to .gitignore

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-06 00:35:00 -07:00

285 lines
7.4 KiB
Go

// Package main provides the standalone rdev-worker binary.
// This worker runs as a separate container alongside a claudebox sidecar,
// polling the rdev-api for tasks and executing them via HTTP calls to the sidecar.
package main
import (
"context"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
claudeboxclient "github.com/orchard9/rdev/internal/adapter/claudebox"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/envutil"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/worker"
)
// version is set via ldflags at build time:
// go build -ldflags "-X main.version=v1.0.0" ./cmd/rdev-worker
var version = "dev"
func main() {
// Configure logging
logLevel := logging.LevelInfo
if envutil.GetEnvBool("DEBUG", false) {
logLevel = logging.LevelDebug
}
log := logging.New(logging.Config{
Level: logLevel,
Format: logging.FormatJSON,
}).WithWorker("rdev-worker")
// Configuration from environment
cfg := loadConfig()
log.Info("starting rdev-worker",
"worker_id", cfg.WorkerID,
"rdev_api_url", cfg.RdevAPIURL,
"claudebox_url", cfg.ClaudeboxURL,
"poll_interval", cfg.PollInterval,
)
// Create API client for rdev-api
apiClient := worker.NewAPIClient(worker.APIClientConfig{
BaseURL: cfg.RdevAPIURL,
APIKey: cfg.APIKey,
Timeout: 30 * time.Second,
})
// Create claudebox client for sidecar
claudeboxClient := claudeboxclient.NewClient(claudeboxclient.ClientConfig{
BaseURL: cfg.ClaudeboxURL,
Timeout: 15 * time.Minute,
})
// Create context with cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Register worker
hostname, _ := os.Hostname()
if err := apiClient.Register(ctx, &worker.RegisterRequest{
ID: cfg.WorkerID,
Hostname: hostname,
Version: version,
Capabilities: cfg.Capabilities,
}); err != nil {
log.Error("failed to register worker", logging.FieldError, err)
os.Exit(1)
}
log.Info("worker registered", "worker_id", cfg.WorkerID)
// Create executors
buildExecutor := worker.NewHTTPBuildExecutor(worker.HTTPBuildExecutorConfig{
ClaudeboxClient: claudeboxClient,
WorkDir: "/workspace",
})
sdlcExecutor := worker.NewHTTPSDLCTaskExecutor(worker.HTTPSDLCTaskExecutorConfig{
ClaudeboxClient: claudeboxClient,
WorkDir: "/workspace",
})
// WaitGroup to track in-flight tasks for graceful shutdown
var wg sync.WaitGroup
// Start heartbeat loop
go runHeartbeat(ctx, apiClient, cfg.WorkerID, cfg.HeartbeatInterval, log)
// Start work loop
go runWorkLoop(ctx, apiClient, buildExecutor, sdlcExecutor, cfg, log, &wg)
// Wait for shutdown signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Info("shutting down worker")
cancel()
// Wait for in-flight tasks to complete with timeout
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
log.Info("all tasks completed, worker stopped")
case <-time.After(cfg.TaskTimeout):
log.Warn("shutdown timeout, some tasks may be incomplete")
}
}
// Config holds worker configuration.
type Config struct {
WorkerID string
RdevAPIURL string
ClaudeboxURL string
APIKey string
PollInterval time.Duration
HeartbeatInterval time.Duration
TaskTimeout time.Duration
Capabilities []string
}
// loadConfig loads configuration from environment variables.
func loadConfig() *Config {
hostname, _ := os.Hostname()
workerID := envutil.GetEnv("WORKER_ID", hostname)
return &Config{
WorkerID: workerID,
RdevAPIURL: envutil.GetEnv("RDEV_API_URL", "http://rdev-api.rdev.svc.cluster.local:8080"),
ClaudeboxURL: envutil.GetEnv("CLAUDEBOX_URL", "http://localhost:8080"),
APIKey: os.Getenv("RDEV_API_KEY"),
PollInterval: parseDuration(envutil.GetEnv("WORKER_POLL_INTERVAL", "5s"), 5*time.Second),
HeartbeatInterval: parseDuration(envutil.GetEnv("WORKER_HEARTBEAT_INTERVAL", "30s"), 30*time.Second),
TaskTimeout: parseDuration(envutil.GetEnv("WORKER_TASK_TIMEOUT", "15m"), 15*time.Minute),
Capabilities: parseCapabilities(os.Getenv("WORKER_CAPABILITIES")),
}
}
// parseDuration parses a duration string with a default fallback.
func parseDuration(s string, defaultVal time.Duration) time.Duration {
d, err := time.ParseDuration(s)
if err != nil {
return defaultVal
}
return d
}
// parseCapabilities parses a comma-separated list of capabilities.
func parseCapabilities(s string) []string {
if s == "" {
return []string{"build", "sdlc"}
}
var caps []string
for _, c := range strings.Split(s, ",") {
c = strings.TrimSpace(c)
if c != "" {
caps = append(caps, c)
}
}
return caps
}
// runHeartbeat runs the heartbeat loop.
func runHeartbeat(ctx context.Context, client *worker.APIClient, workerID string, interval time.Duration, log *logging.Logger) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := client.Heartbeat(ctx, workerID); err != nil {
log.Warn("heartbeat failed", logging.FieldError, err)
}
}
}
}
// runWorkLoop runs the main work polling loop.
func runWorkLoop(
ctx context.Context,
client *worker.APIClient,
buildExecutor *worker.HTTPBuildExecutor,
sdlcExecutor *worker.HTTPSDLCTaskExecutor,
cfg *Config,
log *logging.Logger,
wg *sync.WaitGroup,
) {
ticker := time.NewTicker(cfg.PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Try to claim a task
task, err := client.ClaimTask(ctx, cfg.WorkerID)
if err != nil {
log.Warn("failed to claim task", logging.FieldError, err)
continue
}
if task == nil {
// No tasks available
continue
}
log.Info("task claimed",
"task_id", task.ID,
logging.FieldProjectID, task.ProjectID,
"type", task.Type,
)
// Execute the task with WaitGroup tracking
wg.Add(1)
go func(t *domain.WorkTask) {
defer wg.Done()
executeTask(ctx, client, buildExecutor, sdlcExecutor, t, cfg, log)
}(task)
}
}
}
// executeTask executes a single task.
func executeTask(
ctx context.Context,
client *worker.APIClient,
buildExecutor *worker.HTTPBuildExecutor,
sdlcExecutor *worker.HTTPSDLCTaskExecutor,
task *domain.WorkTask,
cfg *Config,
log *logging.Logger,
) {
// Create task context with timeout
taskCtx, cancel := context.WithTimeout(ctx, cfg.TaskTimeout)
defer cancel()
var result *domain.BuildResult
switch task.Type {
case domain.WorkTaskTypeBuild:
result = buildExecutor.Execute(taskCtx, task)
case domain.WorkTaskTypeSDLC:
result = sdlcExecutor.Execute(taskCtx, task)
default:
result = &domain.BuildResult{
Success: false,
Error: "unsupported task type: " + string(task.Type),
}
}
// Report result back to API
if result.Success {
if err := client.CompleteTask(ctx, cfg.WorkerID, task.ID, result); err != nil {
log.Error("failed to complete task", "task_id", task.ID, logging.FieldError, err)
} else {
log.Info("task completed",
"task_id", task.ID,
"duration_ms", result.DurationMs,
)
}
} else {
if err := client.FailTask(ctx, cfg.WorkerID, task.ID, result.Error, result.Output, result.DurationMs); err != nil {
log.Error("failed to report task failure", "task_id", task.ID, logging.FieldError, err)
} else {
log.Warn("task failed",
"task_id", task.ID,
"error", result.Error,
"duration_ms", result.DurationMs,
)
}
}
}