fix: Persist build audit status when worker claims task
Root cause: WorkerService.ClaimTask() was modifying the audit entry in memory but never persisting it to the database. This caused build tasks to remain stuck at "pending" status even after being claimed. Changes: - Add UpdateStatus method to port.BuildAudit interface - Implement UpdateStatus in postgres.BuildAuditRepository - Fix ClaimTask to call audit.UpdateStatus() to persist status - Add test coverage for audit update during task claim - Update all mock implementations Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
d505aba804
commit
4a18b1cd07
@ -83,6 +83,28 @@ func (r *BuildAuditRepository) Update(ctx context.Context, taskID string, result
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateStatus updates the status and worker assignment when a task is claimed.
|
||||||
|
func (r *BuildAuditRepository) UpdateStatus(ctx context.Context, taskID string, status domain.BuildStatus, workerID string) error {
|
||||||
|
res, err := r.db.ExecContext(ctx, `
|
||||||
|
UPDATE build_audit
|
||||||
|
SET status = $2, worker_id = $3
|
||||||
|
WHERE task_id = $1
|
||||||
|
`, taskID, status, nullString(workerID))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("update build audit status: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := res.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("rows affected: %w", err)
|
||||||
|
}
|
||||||
|
if rows == 0 {
|
||||||
|
return domain.ErrBuildNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Get retrieves a specific audit entry by task ID.
|
// Get retrieves a specific audit entry by task ID.
|
||||||
func (r *BuildAuditRepository) Get(ctx context.Context, taskID string) (*domain.BuildAuditEntry, error) {
|
func (r *BuildAuditRepository) Get(ctx context.Context, taskID string) (*domain.BuildAuditEntry, error) {
|
||||||
rows, err := r.db.QueryContext(ctx, `
|
rows, err := r.db.QueryContext(ctx, `
|
||||||
|
|||||||
@ -181,6 +181,83 @@ func TestBuildAuditRepository_Update(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBuildAuditRepository_UpdateStatus(t *testing.T) {
|
||||||
|
db := testutil.TestDB(t)
|
||||||
|
t.Cleanup(func() { cleanupTestBuildAudit(t, db) })
|
||||||
|
|
||||||
|
repo := NewBuildAuditRepository(db)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Create initial entry
|
||||||
|
entry := &domain.BuildAuditEntry{
|
||||||
|
TaskID: "test-task-status-1",
|
||||||
|
ProjectID: "test-project-1",
|
||||||
|
Spec: domain.BuildSpec{Prompt: "Build"},
|
||||||
|
Status: domain.BuildStatusPending,
|
||||||
|
StartedAt: time.Now(),
|
||||||
|
}
|
||||||
|
if err := repo.Record(ctx, entry); err != nil {
|
||||||
|
t.Fatalf("Record() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("updates status and worker ID", func(t *testing.T) {
|
||||||
|
err := repo.UpdateStatus(ctx, "test-task-status-1", domain.BuildStatusRunning, "worker-123")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("UpdateStatus() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := repo.Get(ctx, "test-task-status-1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Get() error = %v", err)
|
||||||
|
}
|
||||||
|
if got.Status != domain.BuildStatusRunning {
|
||||||
|
t.Errorf("got status %q, want %q", got.Status, domain.BuildStatusRunning)
|
||||||
|
}
|
||||||
|
if got.WorkerID != "worker-123" {
|
||||||
|
t.Errorf("got worker_id %q, want %q", got.WorkerID, "worker-123")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("updates status with empty worker ID", func(t *testing.T) {
|
||||||
|
// Create another entry
|
||||||
|
entry := &domain.BuildAuditEntry{
|
||||||
|
TaskID: "test-task-status-2",
|
||||||
|
ProjectID: "test-project-1",
|
||||||
|
WorkerID: "old-worker",
|
||||||
|
Spec: domain.BuildSpec{Prompt: "Build"},
|
||||||
|
Status: domain.BuildStatusRunning,
|
||||||
|
StartedAt: time.Now(),
|
||||||
|
}
|
||||||
|
if err := repo.Record(ctx, entry); err != nil {
|
||||||
|
t.Fatalf("Record() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := repo.UpdateStatus(ctx, "test-task-status-2", domain.BuildStatusCompleted, "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("UpdateStatus() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := repo.Get(ctx, "test-task-status-2")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Get() error = %v", err)
|
||||||
|
}
|
||||||
|
if got.Status != domain.BuildStatusCompleted {
|
||||||
|
t.Errorf("got status %q, want %q", got.Status, domain.BuildStatusCompleted)
|
||||||
|
}
|
||||||
|
// WorkerID should be cleared when empty string is passed
|
||||||
|
if got.WorkerID != "" {
|
||||||
|
t.Errorf("got worker_id %q, want empty", got.WorkerID)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("returns error for nonexistent task", func(t *testing.T) {
|
||||||
|
err := repo.UpdateStatus(ctx, "test-task-nonexistent", domain.BuildStatusRunning, "worker-1")
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected error for nonexistent task")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestBuildAuditRepository_Get(t *testing.T) {
|
func TestBuildAuditRepository_Get(t *testing.T) {
|
||||||
db := testutil.TestDB(t)
|
db := testutil.TestDB(t)
|
||||||
t.Cleanup(func() { cleanupTestBuildAudit(t, db) })
|
t.Cleanup(func() { cleanupTestBuildAudit(t, db) })
|
||||||
|
|||||||
@ -66,6 +66,19 @@ func (m *mockBuildAudit) Update(_ context.Context, taskID string, result *domain
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockBuildAudit) UpdateStatus(_ context.Context, taskID string, status domain.BuildStatus, workerID string) error {
|
||||||
|
if m.err != nil {
|
||||||
|
return m.err
|
||||||
|
}
|
||||||
|
entry, ok := m.entries[taskID]
|
||||||
|
if !ok {
|
||||||
|
return domain.ErrBuildNotFound
|
||||||
|
}
|
||||||
|
entry.Status = status
|
||||||
|
entry.WorkerID = workerID
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockBuildAudit) Get(_ context.Context, taskID string) (*domain.BuildAuditEntry, error) {
|
func (m *mockBuildAudit) Get(_ context.Context, taskID string) (*domain.BuildAuditEntry, error) {
|
||||||
if m.err != nil {
|
if m.err != nil {
|
||||||
return nil, m.err
|
return nil, m.err
|
||||||
|
|||||||
@ -19,6 +19,10 @@ type BuildAudit interface {
|
|||||||
// Update modifies an existing entry when a build completes.
|
// Update modifies an existing entry when a build completes.
|
||||||
Update(ctx context.Context, taskID string, result *domain.BuildResult) error
|
Update(ctx context.Context, taskID string, result *domain.BuildResult) error
|
||||||
|
|
||||||
|
// UpdateStatus updates the status and worker assignment when a task is claimed.
|
||||||
|
// This is called when a worker picks up a task to mark it as running.
|
||||||
|
UpdateStatus(ctx context.Context, taskID string, status domain.BuildStatus, workerID string) error
|
||||||
|
|
||||||
// Get retrieves a specific audit entry by task ID.
|
// Get retrieves a specific audit entry by task ID.
|
||||||
// Returns ErrBuildNotFound if the entry does not exist.
|
// Returns ErrBuildNotFound if the entry does not exist.
|
||||||
Get(ctx context.Context, taskID string) (*domain.BuildAuditEntry, error)
|
Get(ctx context.Context, taskID string) (*domain.BuildAuditEntry, error)
|
||||||
|
|||||||
@ -130,6 +130,19 @@ func (m *mockBuildAudit) Update(ctx context.Context, taskID string, result *doma
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockBuildAudit) UpdateStatus(ctx context.Context, taskID string, status domain.BuildStatus, workerID string) error {
|
||||||
|
if m.err != nil {
|
||||||
|
return m.err
|
||||||
|
}
|
||||||
|
entry, ok := m.entries[taskID]
|
||||||
|
if !ok {
|
||||||
|
return domain.ErrBuildNotFound
|
||||||
|
}
|
||||||
|
entry.Status = status
|
||||||
|
entry.WorkerID = workerID
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockBuildAudit) Get(ctx context.Context, taskID string) (*domain.BuildAuditEntry, error) {
|
func (m *mockBuildAudit) Get(ctx context.Context, taskID string) (*domain.BuildAuditEntry, error) {
|
||||||
if m.err != nil {
|
if m.err != nil {
|
||||||
return nil, m.err
|
return nil, m.err
|
||||||
|
|||||||
@ -118,12 +118,14 @@ func (s *WorkerService) ClaimTask(ctx context.Context, workerID string) (*domain
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update audit entry if available
|
// Update audit entry if available - persist status change to database
|
||||||
if s.audit != nil {
|
if s.audit != nil {
|
||||||
entry, _ := s.audit.Get(ctx, task.ID)
|
if err := s.audit.UpdateStatus(ctx, task.ID, domain.BuildStatusRunning, workerID); err != nil {
|
||||||
if entry != nil {
|
s.logger.Warn("failed to update audit status after claim",
|
||||||
entry.WorkerID = workerID
|
"task_id", task.ID,
|
||||||
entry.Status = domain.BuildStatusRunning
|
"worker_id", workerID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -174,6 +174,50 @@ func TestWorkerService_ClaimTask(t *testing.T) {
|
|||||||
t.Error("expected nil task when queue is empty")
|
t.Error("expected nil task when queue is empty")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("updates audit status when claiming task", func(t *testing.T) {
|
||||||
|
registry := newMockWorkerRegistry()
|
||||||
|
registry.workers["worker-1"] = &domain.Worker{
|
||||||
|
ID: "worker-1",
|
||||||
|
Hostname: "host-1",
|
||||||
|
Status: domain.WorkerStatusIdle,
|
||||||
|
}
|
||||||
|
|
||||||
|
queue := newMockWorkQueue()
|
||||||
|
queue.tasks["task-1"] = &domain.WorkTask{
|
||||||
|
ID: "task-1",
|
||||||
|
ProjectID: "project-1",
|
||||||
|
Type: domain.WorkTaskTypeBuild,
|
||||||
|
Status: domain.WorkTaskStatusPending,
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
audit := newMockBuildAudit()
|
||||||
|
audit.entries["task-1"] = &domain.BuildAuditEntry{
|
||||||
|
TaskID: "task-1",
|
||||||
|
ProjectID: "project-1",
|
||||||
|
Status: domain.BuildStatusPending,
|
||||||
|
}
|
||||||
|
|
||||||
|
svc := NewWorkerService(registry, queue, nil).WithBuildAudit(audit)
|
||||||
|
|
||||||
|
task, err := svc.ClaimTask(ctx, "worker-1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ClaimTask() error = %v", err)
|
||||||
|
}
|
||||||
|
if task == nil {
|
||||||
|
t.Fatal("expected task to be returned")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify audit was updated
|
||||||
|
entry := audit.entries["task-1"]
|
||||||
|
if entry.Status != domain.BuildStatusRunning {
|
||||||
|
t.Errorf("got audit status %q, want %q", entry.Status, domain.BuildStatusRunning)
|
||||||
|
}
|
||||||
|
if entry.WorkerID != "worker-1" {
|
||||||
|
t.Errorf("got audit worker_id %q, want %q", entry.WorkerID, "worker-1")
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWorkerService_CompleteTask(t *testing.T) {
|
func TestWorkerService_CompleteTask(t *testing.T) {
|
||||||
|
|||||||
@ -221,6 +221,18 @@ func (m *mockBuildAudit) Update(_ context.Context, taskID string, result *domain
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockBuildAudit) UpdateStatus(_ context.Context, taskID string, status domain.BuildStatus, workerID string) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
entry, ok := m.entries[taskID]
|
||||||
|
if !ok {
|
||||||
|
return domain.ErrBuildNotFound
|
||||||
|
}
|
||||||
|
entry.Status = status
|
||||||
|
entry.WorkerID = workerID
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockBuildAudit) Get(_ context.Context, taskID string) (*domain.BuildAuditEntry, error) {
|
func (m *mockBuildAudit) Get(_ context.Context, taskID string) (*domain.BuildAuditEntry, error) {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
@ -308,7 +320,7 @@ func newTestDeps() *testDeps {
|
|||||||
WithBuildAudit(audit)
|
WithBuildAudit(audit)
|
||||||
workSvc := service.NewWorkService(queue, service.WorkServiceConfig{})
|
workSvc := service.NewWorkService(queue, service.WorkServiceConfig{})
|
||||||
|
|
||||||
buildExec := NewBuildExecutor(agentRegistry, nil, nil)
|
buildExec := NewBuildExecutor(agentRegistry, nil, nil, nil)
|
||||||
|
|
||||||
return &testDeps{
|
return &testDeps{
|
||||||
queue: queue,
|
queue: queue,
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user