When a worker dies mid-build, queue maintenance now updates both work_queue and build_audit tables when requeuing stale tasks. This prevents builds from showing "running" forever in the API. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
274 lines
6.6 KiB
Go
274 lines
6.6 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// mockWorkQueue implements port.WorkQueue for service tests.
|
|
// Configure tasks and err fields to control behavior.
|
|
type mockWorkQueue struct {
|
|
tasks map[string]*domain.WorkTask
|
|
err error
|
|
}
|
|
|
|
func newMockWorkQueue() *mockWorkQueue {
|
|
return &mockWorkQueue{tasks: make(map[string]*domain.WorkTask)}
|
|
}
|
|
|
|
func (m *mockWorkQueue) Enqueue(ctx context.Context, task *domain.WorkTask) (string, error) {
|
|
if m.err != nil {
|
|
return "", m.err
|
|
}
|
|
id := fmt.Sprintf("task-%d", len(m.tasks)+1)
|
|
task.ID = id
|
|
task.Status = domain.WorkTaskStatusPending
|
|
task.CreatedAt = time.Now()
|
|
m.tasks[id] = task
|
|
return id, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) Dequeue(ctx context.Context, workerID string) (*domain.WorkTask, error) {
|
|
if m.err != nil {
|
|
return nil, m.err
|
|
}
|
|
for _, task := range m.tasks {
|
|
if task.Status == domain.WorkTaskStatusPending {
|
|
task.Status = domain.WorkTaskStatusRunning
|
|
task.WorkerID = workerID
|
|
now := time.Now()
|
|
task.StartedAt = &now
|
|
return task, nil
|
|
}
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) Complete(ctx context.Context, taskID string, result *domain.WorkResult) error {
|
|
if m.err != nil {
|
|
return m.err
|
|
}
|
|
task, ok := m.tasks[taskID]
|
|
if !ok {
|
|
return domain.ErrWorkTaskNotFound
|
|
}
|
|
task.Status = domain.WorkTaskStatusCompleted
|
|
task.Result = result
|
|
return nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) Fail(ctx context.Context, taskID string, errMsg string) error {
|
|
return nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) Cancel(ctx context.Context, taskID string) error {
|
|
return nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.WorkTask, error) {
|
|
task, ok := m.tasks[taskID]
|
|
if !ok {
|
|
return nil, domain.ErrWorkTaskNotFound
|
|
}
|
|
return task, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
|
return &domain.WorkListResult{}, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) GetStats(ctx context.Context) (*domain.WorkQueueStats, error) {
|
|
return &domain.WorkQueueStats{}, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) CleanupOld(ctx context.Context, olderThan time.Duration) (int64, error) {
|
|
return 0, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) RequeueStale(ctx context.Context, timeout time.Duration) (int64, error) {
|
|
return 0, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
// mockBuildAudit implements port.BuildAudit for service tests.
|
|
type mockBuildAudit struct {
|
|
entries map[string]*domain.BuildAuditEntry
|
|
err error
|
|
}
|
|
|
|
func newMockBuildAudit() *mockBuildAudit {
|
|
return &mockBuildAudit{entries: make(map[string]*domain.BuildAuditEntry)}
|
|
}
|
|
|
|
func (m *mockBuildAudit) Record(ctx context.Context, entry *domain.BuildAuditEntry) error {
|
|
if m.err != nil {
|
|
return m.err
|
|
}
|
|
m.entries[entry.TaskID] = entry
|
|
return nil
|
|
}
|
|
|
|
func (m *mockBuildAudit) Update(ctx context.Context, taskID string, result *domain.BuildResult) error {
|
|
if m.err != nil {
|
|
return m.err
|
|
}
|
|
entry, ok := m.entries[taskID]
|
|
if !ok {
|
|
return domain.ErrBuildNotFound
|
|
}
|
|
entry.Result = result
|
|
if result.Success {
|
|
entry.Status = domain.BuildStatusCompleted
|
|
} else {
|
|
entry.Status = domain.BuildStatusFailed
|
|
}
|
|
now := time.Now()
|
|
entry.CompletedAt = &now
|
|
return nil
|
|
}
|
|
|
|
func (m *mockBuildAudit) UpdateStatus(ctx context.Context, taskID string, status domain.BuildStatus, workerID string) error {
|
|
if m.err != nil {
|
|
return m.err
|
|
}
|
|
entry, ok := m.entries[taskID]
|
|
if !ok {
|
|
return domain.ErrBuildNotFound
|
|
}
|
|
entry.Status = status
|
|
entry.WorkerID = workerID
|
|
return nil
|
|
}
|
|
|
|
func (m *mockBuildAudit) Get(ctx context.Context, taskID string) (*domain.BuildAuditEntry, error) {
|
|
if m.err != nil {
|
|
return nil, m.err
|
|
}
|
|
entry, ok := m.entries[taskID]
|
|
if !ok {
|
|
return nil, domain.ErrBuildNotFound
|
|
}
|
|
return entry, nil
|
|
}
|
|
|
|
func (m *mockBuildAudit) List(ctx context.Context, filter port.BuildAuditFilter) ([]*domain.BuildAuditEntry, error) {
|
|
if m.err != nil {
|
|
return nil, m.err
|
|
}
|
|
var result []*domain.BuildAuditEntry
|
|
for _, entry := range m.entries {
|
|
if filter.ProjectID != "" && entry.ProjectID != filter.ProjectID {
|
|
continue
|
|
}
|
|
result = append(result, entry)
|
|
}
|
|
if filter.Limit > 0 && len(result) > filter.Limit {
|
|
result = result[:filter.Limit]
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// mockWorkerRegistry implements port.WorkerRegistry for service tests.
|
|
type mockWorkerRegistry struct {
|
|
workers map[string]*domain.Worker
|
|
err error
|
|
}
|
|
|
|
func newMockWorkerRegistry() *mockWorkerRegistry {
|
|
return &mockWorkerRegistry{workers: make(map[string]*domain.Worker)}
|
|
}
|
|
|
|
func (m *mockWorkerRegistry) Register(ctx context.Context, worker *domain.Worker) error {
|
|
if m.err != nil {
|
|
return m.err
|
|
}
|
|
m.workers[worker.ID] = worker
|
|
return nil
|
|
}
|
|
|
|
func (m *mockWorkerRegistry) Heartbeat(ctx context.Context, workerID string) error {
|
|
if m.err != nil {
|
|
return m.err
|
|
}
|
|
w, ok := m.workers[workerID]
|
|
if !ok {
|
|
return domain.ErrWorkerNotFound
|
|
}
|
|
w.LastHeartbeat = time.Now()
|
|
return nil
|
|
}
|
|
|
|
func (m *mockWorkerRegistry) UpdateStatus(ctx context.Context, workerID string, status domain.WorkerStatus, taskID string) error {
|
|
if m.err != nil {
|
|
return m.err
|
|
}
|
|
w, ok := m.workers[workerID]
|
|
if !ok {
|
|
return domain.ErrWorkerNotFound
|
|
}
|
|
w.Status = status
|
|
w.CurrentTask = taskID
|
|
return nil
|
|
}
|
|
|
|
func (m *mockWorkerRegistry) Deregister(ctx context.Context, workerID string) error {
|
|
if m.err != nil {
|
|
return m.err
|
|
}
|
|
if _, ok := m.workers[workerID]; !ok {
|
|
return domain.ErrWorkerNotFound
|
|
}
|
|
delete(m.workers, workerID)
|
|
return nil
|
|
}
|
|
|
|
func (m *mockWorkerRegistry) Get(ctx context.Context, workerID string) (*domain.Worker, error) {
|
|
if m.err != nil {
|
|
return nil, m.err
|
|
}
|
|
w, ok := m.workers[workerID]
|
|
if !ok {
|
|
return nil, domain.ErrWorkerNotFound
|
|
}
|
|
return w, nil
|
|
}
|
|
|
|
func (m *mockWorkerRegistry) List(ctx context.Context, filter port.WorkerFilter) ([]*domain.Worker, error) {
|
|
if m.err != nil {
|
|
return nil, m.err
|
|
}
|
|
var result []*domain.Worker
|
|
for _, w := range m.workers {
|
|
if filter.Status != nil && w.Status != *filter.Status {
|
|
continue
|
|
}
|
|
result = append(result, w)
|
|
}
|
|
if filter.Limit > 0 && len(result) > filter.Limit {
|
|
result = result[:filter.Limit]
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (m *mockWorkerRegistry) MarkStaleOffline(ctx context.Context, threshold time.Duration) (int, error) {
|
|
if m.err != nil {
|
|
return 0, m.err
|
|
}
|
|
count := 0
|
|
for _, w := range m.workers {
|
|
if w.Status != domain.WorkerStatusOffline && time.Since(w.LastHeartbeat) > threshold {
|
|
w.Status = domain.WorkerStatusOffline
|
|
w.CurrentTask = ""
|
|
count++
|
|
}
|
|
}
|
|
return count, nil
|
|
}
|