// Package postgres provides PostgreSQL-based implementations of port interfaces. package postgres import ( "context" "database/sql" "encoding/json" "fmt" "time" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" ) // BuildAuditRepository implements port.BuildAudit using PostgreSQL. type BuildAuditRepository struct { db *sql.DB } // NewBuildAuditRepository creates a new PostgreSQL build audit repository. func NewBuildAuditRepository(db *sql.DB) *BuildAuditRepository { return &BuildAuditRepository{db: db} } // Ensure BuildAuditRepository implements port.BuildAudit at compile time. var _ port.BuildAudit = (*BuildAuditRepository)(nil) // Record creates a new audit entry when a build starts. func (r *BuildAuditRepository) Record(ctx context.Context, entry *domain.BuildAuditEntry) error { specJSON, err := json.Marshal(entry.Spec) if err != nil { return fmt.Errorf("marshal build spec: %w", err) } _, err = r.db.ExecContext(ctx, ` INSERT INTO build_audit (task_id, project_id, worker_id, spec, status, started_at) VALUES ($1, $2, $3, $4, $5, $6) `, entry.TaskID, entry.ProjectID, nullString(entry.WorkerID), specJSON, entry.Status, entry.StartedAt) if err != nil { return fmt.Errorf("record build audit: %w", err) } return nil } // Update modifies an existing entry when a build completes. func (r *BuildAuditRepository) Update(ctx context.Context, taskID string, result *domain.BuildResult) error { var resultJSON []byte var err error if result != nil { resultJSON, err = json.Marshal(result) if err != nil { return fmt.Errorf("marshal build result: %w", err) } } status := domain.BuildStatusCompleted if result != nil && !result.Success { status = domain.BuildStatusFailed } now := time.Now() res, err := r.db.ExecContext(ctx, ` UPDATE build_audit SET result = $2, status = $3, completed_at = $4 WHERE task_id = $1 `, taskID, resultJSON, status, now) if err != nil { return fmt.Errorf("update build audit: %w", err) } rows, err := res.RowsAffected() if err != nil { return fmt.Errorf("rows affected: %w", err) } if rows == 0 { return domain.ErrBuildNotFound } return nil } // UpdateStatus updates the status and worker assignment when a task is claimed. // When status is "running" (task claimed for execution/retry), this also clears // any stale result and completed_at from previous failed attempts. func (r *BuildAuditRepository) UpdateStatus(ctx context.Context, taskID string, status domain.BuildStatus, workerID string) error { // When a task transitions to running (claimed for execution or retry), // clear stale result data from any previous failed attempts. var query string if status == domain.BuildStatusRunning { query = ` UPDATE build_audit SET status = $2, worker_id = $3, result = NULL, completed_at = NULL, started_at = NOW() WHERE task_id = $1 ` } else { query = ` UPDATE build_audit SET status = $2, worker_id = $3 WHERE task_id = $1 ` } res, err := r.db.ExecContext(ctx, query, taskID, status, nullString(workerID)) if err != nil { return fmt.Errorf("update build audit status: %w", err) } rows, err := res.RowsAffected() if err != nil { return fmt.Errorf("rows affected: %w", err) } if rows == 0 { return domain.ErrBuildNotFound } return nil } // Get retrieves a specific audit entry by task ID. func (r *BuildAuditRepository) Get(ctx context.Context, taskID string) (*domain.BuildAuditEntry, error) { rows, err := r.db.QueryContext(ctx, ` SELECT task_id, project_id, worker_id, spec, result, status, started_at, completed_at FROM build_audit WHERE task_id = $1 `, taskID) if err != nil { return nil, fmt.Errorf("get build audit: %w", err) } defer func() { _ = rows.Close() }() if !rows.Next() { if err := rows.Err(); err != nil { return nil, fmt.Errorf("get build audit: %w", err) } return nil, domain.ErrBuildNotFound } return r.scanEntry(rows) } // List returns audit entries matching the filter. func (r *BuildAuditRepository) List(ctx context.Context, filter port.BuildAuditFilter) ([]*domain.BuildAuditEntry, error) { query := ` SELECT task_id, project_id, worker_id, spec, result, status, started_at, completed_at FROM build_audit WHERE 1=1` args := []any{} argNum := 1 if filter.ProjectID != "" { query += fmt.Sprintf(" AND project_id = $%d", argNum) args = append(args, filter.ProjectID) argNum++ } if filter.WorkerID != "" { query += fmt.Sprintf(" AND worker_id = $%d", argNum) args = append(args, filter.WorkerID) argNum++ } if filter.Status != nil { query += fmt.Sprintf(" AND status = $%d", argNum) args = append(args, string(*filter.Status)) argNum++ } if !filter.Since.IsZero() { query += fmt.Sprintf(" AND started_at >= $%d", argNum) args = append(args, filter.Since) argNum++ } query += " ORDER BY started_at DESC" if filter.Limit > 0 { query += fmt.Sprintf(" LIMIT $%d", argNum) args = append(args, filter.Limit) } rows, err := r.db.QueryContext(ctx, query, args...) if err != nil { return nil, fmt.Errorf("list build audit: %w", err) } defer func() { _ = rows.Close() }() var entries []*domain.BuildAuditEntry for rows.Next() { entry, err := r.scanEntry(rows) if err != nil { return nil, err } entries = append(entries, entry) } return entries, rows.Err() } // scanEntry scans a single build audit row from a query result. func (r *BuildAuditRepository) scanEntry(rows *sql.Rows) (*domain.BuildAuditEntry, error) { var entry domain.BuildAuditEntry var workerID sql.NullString var specJSON []byte var resultJSON []byte var completedAt sql.NullTime err := rows.Scan( &entry.TaskID, &entry.ProjectID, &workerID, &specJSON, &resultJSON, &entry.Status, &entry.StartedAt, &completedAt, ) if err != nil { return nil, fmt.Errorf("scan build audit: %w", err) } if workerID.Valid { entry.WorkerID = workerID.String } if completedAt.Valid { entry.CompletedAt = &completedAt.Time } if len(specJSON) > 0 { if err := json.Unmarshal(specJSON, &entry.Spec); err != nil { return nil, fmt.Errorf("unmarshal build spec: %w", err) } } if len(resultJSON) > 0 { entry.Result = &domain.BuildResult{} if err := json.Unmarshal(resultJSON, entry.Result); err != nil { return nil, fmt.Errorf("unmarshal build result: %w", err) } } return &entry, nil }