Root cause: WorkerService.ClaimTask() was modifying the audit entry in memory but never persisting it to the database. This caused build tasks to remain stuck at "pending" status even after being claimed. Changes: - Add UpdateStatus method to port.BuildAudit interface - Implement UpdateStatus in postgres.BuildAuditRepository - Fix ClaimTask to call audit.UpdateStatus() to persist status - Add test coverage for audit update during task claim - Update all mock implementations Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
234 lines
6.0 KiB
Go
234 lines
6.0 KiB
Go
// Package service provides business logic services.
|
|
package service
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
const (
|
|
// DefaultHeartbeatInterval is how often the health checker runs.
|
|
DefaultHeartbeatInterval = 30 * time.Second
|
|
|
|
// DefaultStaleThreshold is how long since last heartbeat before marking offline.
|
|
DefaultStaleThreshold = 90 * time.Second
|
|
)
|
|
|
|
// WorkerService manages worker lifecycle and task assignment.
|
|
// It coordinates between the worker registry (pool management) and
|
|
// the work queue (task execution).
|
|
type WorkerService struct {
|
|
registry port.WorkerRegistry
|
|
queue port.WorkQueue
|
|
audit port.BuildAudit
|
|
logger *slog.Logger
|
|
}
|
|
|
|
// NewWorkerService creates a new worker service.
|
|
func NewWorkerService(
|
|
registry port.WorkerRegistry,
|
|
queue port.WorkQueue,
|
|
logger *slog.Logger,
|
|
) *WorkerService {
|
|
if logger == nil {
|
|
logger = slog.Default()
|
|
}
|
|
return &WorkerService{
|
|
registry: registry,
|
|
queue: queue,
|
|
logger: logger.With("service", "worker"),
|
|
}
|
|
}
|
|
|
|
// WithBuildAudit adds a build audit for recording task assignments.
|
|
func (s *WorkerService) WithBuildAudit(audit port.BuildAudit) *WorkerService {
|
|
s.audit = audit
|
|
return s
|
|
}
|
|
|
|
// Register adds a worker to the pool.
|
|
func (s *WorkerService) Register(ctx context.Context, worker *domain.Worker) error {
|
|
if err := worker.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
worker.RegisteredAt = time.Now()
|
|
worker.LastHeartbeat = time.Now()
|
|
worker.Status = domain.WorkerStatusIdle
|
|
|
|
if err := s.registry.Register(ctx, worker); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.logger.Info("worker registered",
|
|
"worker_id", worker.ID,
|
|
"hostname", worker.Hostname,
|
|
"version", worker.Version,
|
|
"capabilities", worker.Capabilities,
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Heartbeat updates worker liveness.
|
|
func (s *WorkerService) Heartbeat(ctx context.Context, workerID string) error {
|
|
return s.registry.Heartbeat(ctx, workerID)
|
|
}
|
|
|
|
// Deregister removes a worker from the pool.
|
|
func (s *WorkerService) Deregister(ctx context.Context, workerID string) error {
|
|
if err := s.registry.Deregister(ctx, workerID); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.logger.Info("worker deregistered", "worker_id", workerID)
|
|
return nil
|
|
}
|
|
|
|
// GetWorker retrieves a specific worker.
|
|
func (s *WorkerService) GetWorker(ctx context.Context, workerID string) (*domain.Worker, error) {
|
|
return s.registry.Get(ctx, workerID)
|
|
}
|
|
|
|
// ListWorkers returns all workers matching the optional filter.
|
|
func (s *WorkerService) ListWorkers(ctx context.Context, filter port.WorkerFilter) ([]*domain.Worker, error) {
|
|
return s.registry.List(ctx, filter)
|
|
}
|
|
|
|
// ClaimTask atomically dequeues a task and marks worker as busy.
|
|
func (s *WorkerService) ClaimTask(ctx context.Context, workerID string) (*domain.WorkTask, error) {
|
|
task, err := s.queue.Dequeue(ctx, workerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if task == nil {
|
|
return nil, nil // No tasks available
|
|
}
|
|
|
|
// Mark worker as busy with the claimed task
|
|
if err := s.registry.UpdateStatus(ctx, workerID, domain.WorkerStatusBusy, task.ID); err != nil {
|
|
s.logger.Warn("failed to update worker status after claim",
|
|
"worker_id", workerID,
|
|
"task_id", task.ID,
|
|
"error", err,
|
|
)
|
|
}
|
|
|
|
// Update audit entry if available - persist status change to database
|
|
if s.audit != nil {
|
|
if err := s.audit.UpdateStatus(ctx, task.ID, domain.BuildStatusRunning, workerID); err != nil {
|
|
s.logger.Warn("failed to update audit status after claim",
|
|
"task_id", task.ID,
|
|
"worker_id", workerID,
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|
|
|
|
s.logger.Info("task claimed",
|
|
"task_id", task.ID,
|
|
"worker_id", workerID,
|
|
"project_id", task.ProjectID,
|
|
"type", task.Type,
|
|
)
|
|
|
|
return task, nil
|
|
}
|
|
|
|
// CompleteTask marks a task as complete and returns worker to idle.
|
|
func (s *WorkerService) CompleteTask(ctx context.Context, workerID, taskID string, result *domain.BuildResult) error {
|
|
if result == nil {
|
|
result = &domain.BuildResult{}
|
|
}
|
|
|
|
// Convert domain build result to work result
|
|
bwr := result.ToWorkResult()
|
|
workResult := &domain.WorkResult{
|
|
Output: bwr.Output,
|
|
Artifacts: bwr.Artifacts,
|
|
}
|
|
|
|
// Update audit record (non-critical)
|
|
if s.audit != nil {
|
|
if err := s.audit.Update(ctx, taskID, result); err != nil {
|
|
s.logger.Warn("failed to update audit",
|
|
"task_id", taskID,
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|
|
|
|
// Complete in queue
|
|
if err := s.queue.Complete(ctx, taskID, workResult); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Return worker to idle
|
|
if err := s.registry.UpdateStatus(ctx, workerID, domain.WorkerStatusIdle, ""); err != nil {
|
|
s.logger.Warn("failed to return worker to idle",
|
|
"worker_id", workerID,
|
|
"error", err,
|
|
)
|
|
}
|
|
|
|
s.logger.Info("task completed",
|
|
"task_id", taskID,
|
|
"worker_id", workerID,
|
|
"success", result.Success,
|
|
"duration_ms", result.DurationMs,
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// DrainWorker sets a worker to draining status so it finishes current work
|
|
// but doesn't accept new tasks.
|
|
func (s *WorkerService) DrainWorker(ctx context.Context, workerID string) error {
|
|
worker, err := s.registry.Get(ctx, workerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.registry.UpdateStatus(ctx, workerID, domain.WorkerStatusDraining, worker.CurrentTask); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.logger.Info("worker draining",
|
|
"worker_id", workerID,
|
|
"current_task", worker.CurrentTask,
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// StartHealthChecker runs a background goroutine that marks stale workers offline.
|
|
// It returns when the context is cancelled.
|
|
func (s *WorkerService) StartHealthChecker(ctx context.Context) {
|
|
ticker := time.NewTicker(DefaultHeartbeatInterval)
|
|
defer ticker.Stop()
|
|
|
|
s.logger.Info("worker health checker started",
|
|
"interval", DefaultHeartbeatInterval,
|
|
"stale_threshold", DefaultStaleThreshold,
|
|
)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
s.logger.Info("worker health checker stopped")
|
|
return
|
|
case <-ticker.C:
|
|
count, err := s.registry.MarkStaleOffline(ctx, DefaultStaleThreshold)
|
|
if err != nil {
|
|
s.logger.Error("failed to mark stale workers", "error", err)
|
|
} else if count > 0 {
|
|
s.logger.Warn("marked workers offline", "count", count)
|
|
}
|
|
}
|
|
}
|
|
}
|