rdev/internal/service/worker_service.go
jordan 4a18b1cd07 fix: Persist build audit status when worker claims task
Root cause: WorkerService.ClaimTask() was modifying the audit entry
in memory but never persisting it to the database. This caused build
tasks to remain stuck at "pending" status even after being claimed.

Changes:
- Add UpdateStatus method to port.BuildAudit interface
- Implement UpdateStatus in postgres.BuildAuditRepository
- Fix ClaimTask to call audit.UpdateStatus() to persist status
- Add test coverage for audit update during task claim
- Update all mock implementations

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 21:25:04 -07:00

234 lines
6.0 KiB
Go

// Package service provides business logic services.
package service
import (
"context"
"log/slog"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
)
const (
// DefaultHeartbeatInterval is how often the health checker runs.
DefaultHeartbeatInterval = 30 * time.Second
// DefaultStaleThreshold is how long since last heartbeat before marking offline.
DefaultStaleThreshold = 90 * time.Second
)
// WorkerService manages worker lifecycle and task assignment.
// It coordinates between the worker registry (pool management) and
// the work queue (task execution).
type WorkerService struct {
registry port.WorkerRegistry
queue port.WorkQueue
audit port.BuildAudit
logger *slog.Logger
}
// NewWorkerService creates a new worker service.
func NewWorkerService(
registry port.WorkerRegistry,
queue port.WorkQueue,
logger *slog.Logger,
) *WorkerService {
if logger == nil {
logger = slog.Default()
}
return &WorkerService{
registry: registry,
queue: queue,
logger: logger.With("service", "worker"),
}
}
// WithBuildAudit adds a build audit for recording task assignments.
func (s *WorkerService) WithBuildAudit(audit port.BuildAudit) *WorkerService {
s.audit = audit
return s
}
// Register adds a worker to the pool.
func (s *WorkerService) Register(ctx context.Context, worker *domain.Worker) error {
if err := worker.Validate(); err != nil {
return err
}
worker.RegisteredAt = time.Now()
worker.LastHeartbeat = time.Now()
worker.Status = domain.WorkerStatusIdle
if err := s.registry.Register(ctx, worker); err != nil {
return err
}
s.logger.Info("worker registered",
"worker_id", worker.ID,
"hostname", worker.Hostname,
"version", worker.Version,
"capabilities", worker.Capabilities,
)
return nil
}
// Heartbeat updates worker liveness.
func (s *WorkerService) Heartbeat(ctx context.Context, workerID string) error {
return s.registry.Heartbeat(ctx, workerID)
}
// Deregister removes a worker from the pool.
func (s *WorkerService) Deregister(ctx context.Context, workerID string) error {
if err := s.registry.Deregister(ctx, workerID); err != nil {
return err
}
s.logger.Info("worker deregistered", "worker_id", workerID)
return nil
}
// GetWorker retrieves a specific worker.
func (s *WorkerService) GetWorker(ctx context.Context, workerID string) (*domain.Worker, error) {
return s.registry.Get(ctx, workerID)
}
// ListWorkers returns all workers matching the optional filter.
func (s *WorkerService) ListWorkers(ctx context.Context, filter port.WorkerFilter) ([]*domain.Worker, error) {
return s.registry.List(ctx, filter)
}
// ClaimTask atomically dequeues a task and marks worker as busy.
func (s *WorkerService) ClaimTask(ctx context.Context, workerID string) (*domain.WorkTask, error) {
task, err := s.queue.Dequeue(ctx, workerID)
if err != nil {
return nil, err
}
if task == nil {
return nil, nil // No tasks available
}
// Mark worker as busy with the claimed task
if err := s.registry.UpdateStatus(ctx, workerID, domain.WorkerStatusBusy, task.ID); err != nil {
s.logger.Warn("failed to update worker status after claim",
"worker_id", workerID,
"task_id", task.ID,
"error", err,
)
}
// Update audit entry if available - persist status change to database
if s.audit != nil {
if err := s.audit.UpdateStatus(ctx, task.ID, domain.BuildStatusRunning, workerID); err != nil {
s.logger.Warn("failed to update audit status after claim",
"task_id", task.ID,
"worker_id", workerID,
"error", err,
)
}
}
s.logger.Info("task claimed",
"task_id", task.ID,
"worker_id", workerID,
"project_id", task.ProjectID,
"type", task.Type,
)
return task, nil
}
// CompleteTask marks a task as complete and returns worker to idle.
func (s *WorkerService) CompleteTask(ctx context.Context, workerID, taskID string, result *domain.BuildResult) error {
if result == nil {
result = &domain.BuildResult{}
}
// Convert domain build result to work result
bwr := result.ToWorkResult()
workResult := &domain.WorkResult{
Output: bwr.Output,
Artifacts: bwr.Artifacts,
}
// Update audit record (non-critical)
if s.audit != nil {
if err := s.audit.Update(ctx, taskID, result); err != nil {
s.logger.Warn("failed to update audit",
"task_id", taskID,
"error", err,
)
}
}
// Complete in queue
if err := s.queue.Complete(ctx, taskID, workResult); err != nil {
return err
}
// Return worker to idle
if err := s.registry.UpdateStatus(ctx, workerID, domain.WorkerStatusIdle, ""); err != nil {
s.logger.Warn("failed to return worker to idle",
"worker_id", workerID,
"error", err,
)
}
s.logger.Info("task completed",
"task_id", taskID,
"worker_id", workerID,
"success", result.Success,
"duration_ms", result.DurationMs,
)
return nil
}
// DrainWorker sets a worker to draining status so it finishes current work
// but doesn't accept new tasks.
func (s *WorkerService) DrainWorker(ctx context.Context, workerID string) error {
worker, err := s.registry.Get(ctx, workerID)
if err != nil {
return err
}
if err := s.registry.UpdateStatus(ctx, workerID, domain.WorkerStatusDraining, worker.CurrentTask); err != nil {
return err
}
s.logger.Info("worker draining",
"worker_id", workerID,
"current_task", worker.CurrentTask,
)
return nil
}
// StartHealthChecker runs a background goroutine that marks stale workers offline.
// It returns when the context is cancelled.
func (s *WorkerService) StartHealthChecker(ctx context.Context) {
ticker := time.NewTicker(DefaultHeartbeatInterval)
defer ticker.Stop()
s.logger.Info("worker health checker started",
"interval", DefaultHeartbeatInterval,
"stale_threshold", DefaultStaleThreshold,
)
for {
select {
case <-ctx.Done():
s.logger.Info("worker health checker stopped")
return
case <-ticker.C:
count, err := s.registry.MarkStaleOffline(ctx, DefaultStaleThreshold)
if err != nil {
s.logger.Error("failed to mark stale workers", "error", err)
} else if count > 0 {
s.logger.Warn("marked workers offline", "count", count)
}
}
}
}