// Package postgres provides PostgreSQL-based implementations of port interfaces. package postgres import ( "context" "database/sql" "encoding/json" "errors" "fmt" "strings" "time" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" ) // OperationRepository implements port.OperationRepository using PostgreSQL. type OperationRepository struct { db *sql.DB } // NewOperationRepository creates a new PostgreSQL operation repository. func NewOperationRepository(db *sql.DB) *OperationRepository { return &OperationRepository{db: db} } // Ensure OperationRepository implements port.OperationRepository at compile time. var _ port.OperationRepository = (*OperationRepository)(nil) // Create creates a new operation record. func (r *OperationRepository) Create(ctx context.Context, op *domain.Operation) error { inputJSON, err := json.Marshal(op.Input) if err != nil { return fmt.Errorf("marshal input: %w", err) } stepsJSON, err := json.Marshal(op.Steps) if err != nil { return fmt.Errorf("marshal steps: %w", err) } _, err = r.db.ExecContext(ctx, ` INSERT INTO operations ( id, project_id, type, status, request_id, triggered_by, commit_sha, external_ref, started_at, input, steps ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) `, op.ID, op.ProjectID, string(op.Type), string(op.Status), nullString(op.RequestID), nullString(op.TriggeredBy), nullString(op.CommitSHA), nullString(op.ExternalRef), op.StartedAt, inputJSON, stepsJSON, ) if err != nil { return fmt.Errorf("insert operation: %w", err) } return nil } // Update updates an existing operation record. func (r *OperationRepository) Update(ctx context.Context, op *domain.Operation) error { inputJSON, err := json.Marshal(op.Input) if err != nil { return fmt.Errorf("marshal input: %w", err) } outputJSON, err := json.Marshal(op.Output) if err != nil { return fmt.Errorf("marshal output: %w", err) } stepsJSON, err := json.Marshal(op.Steps) if err != nil { return fmt.Errorf("marshal steps: %w", err) } res, err := r.db.ExecContext(ctx, ` UPDATE operations SET status = $2, request_id = $3, triggered_by = $4, commit_sha = $5, external_ref = $6, completed_at = $7, duration_ms = $8, input = $9, output = $10, error = $11, error_detail = $12, steps = $13 WHERE id = $1 `, op.ID, string(op.Status), nullString(op.RequestID), nullString(op.TriggeredBy), nullString(op.CommitSHA), nullString(op.ExternalRef), nullTime(op.CompletedAt), nullInt64(op.DurationMs), inputJSON, outputJSON, nullString(op.Error), nullString(op.ErrorDetail), stepsJSON, ) if err != nil { return fmt.Errorf("update operation: %w", err) } rows, err := res.RowsAffected() if err != nil { return fmt.Errorf("rows affected: %w", err) } if rows == 0 { return domain.ErrOperationNotFound } return nil } // Get retrieves an operation by ID. func (r *OperationRepository) Get(ctx context.Context, id string) (*domain.Operation, error) { row := r.db.QueryRowContext(ctx, ` SELECT id, project_id, type, status, request_id, triggered_by, commit_sha, external_ref, started_at, completed_at, duration_ms, input, output, error, error_detail, steps, created_at FROM operations WHERE id = $1 `, id) return r.scanOperation(row) } // GetByCommitSHA finds the operation that created a specific commit. func (r *OperationRepository) GetByCommitSHA(ctx context.Context, projectID, sha string) (*domain.Operation, error) { row := r.db.QueryRowContext(ctx, ` SELECT id, project_id, type, status, request_id, triggered_by, commit_sha, external_ref, started_at, completed_at, duration_ms, input, output, error, error_detail, steps, created_at FROM operations WHERE project_id = $1 AND commit_sha = $2 ORDER BY started_at DESC LIMIT 1 `, projectID, sha) return r.scanOperation(row) } // List returns operations matching the filter criteria. func (r *OperationRepository) List(ctx context.Context, filter domain.OperationFilters) ([]*domain.Operation, error) { filter.Normalize() query := strings.Builder{} query.WriteString(` SELECT id, project_id, type, status, request_id, triggered_by, commit_sha, external_ref, started_at, completed_at, duration_ms, input, output, error, error_detail, steps, created_at FROM operations WHERE project_id = $1 `) args := []any{filter.ProjectID} argNum := 2 if filter.Type != "" { fmt.Fprintf(&query, " AND type = $%d", argNum) args = append(args, string(filter.Type)) argNum++ } if filter.Status != "" { fmt.Fprintf(&query, " AND status = $%d", argNum) args = append(args, string(filter.Status)) argNum++ } if filter.CommitSHA != "" { fmt.Fprintf(&query, " AND commit_sha = $%d", argNum) args = append(args, filter.CommitSHA) argNum++ } if !filter.Since.IsZero() { fmt.Fprintf(&query, " AND started_at >= $%d", argNum) args = append(args, filter.Since) argNum++ } query.WriteString(" ORDER BY started_at DESC") fmt.Fprintf(&query, " LIMIT $%d", argNum) args = append(args, filter.Limit) rows, err := r.db.QueryContext(ctx, query.String(), args...) if err != nil { return nil, fmt.Errorf("query operations: %w", err) } defer func() { _ = rows.Close() }() var operations []*domain.Operation for rows.Next() { op, err := r.scanOperationRows(rows) if err != nil { return nil, err } operations = append(operations, op) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterate operations: %w", err) } return operations, nil } // AddStep appends a new step to an operation. func (r *OperationRepository) AddStep(ctx context.Context, operationID string, step domain.OperationStep) error { stepJSON, err := json.Marshal(step) if err != nil { return fmt.Errorf("marshal step: %w", err) } // Use JSONB array concatenation to append the step res, err := r.db.ExecContext(ctx, ` UPDATE operations SET steps = steps || $2::jsonb WHERE id = $1 `, operationID, stepJSON) if err != nil { return fmt.Errorf("add step: %w", err) } rows, err := res.RowsAffected() if err != nil { return fmt.Errorf("rows affected: %w", err) } if rows == 0 { return domain.ErrOperationNotFound } return nil } // UpdateStep updates an existing step within an operation. func (r *OperationRepository) UpdateStep(ctx context.Context, operationID string, step domain.OperationStep) error { // Get current steps op, err := r.Get(ctx, operationID) if err != nil { return err } // Find and update the step by name found := false for i := range op.Steps { if op.Steps[i].Name == step.Name { op.Steps[i] = step found = true break } } if !found { return fmt.Errorf("step %q not found", step.Name) } stepsJSON, err := json.Marshal(op.Steps) if err != nil { return fmt.Errorf("marshal steps: %w", err) } res, err := r.db.ExecContext(ctx, ` UPDATE operations SET steps = $2 WHERE id = $1 `, operationID, stepsJSON) if err != nil { return fmt.Errorf("update step: %w", err) } rows, err := res.RowsAffected() if err != nil { return fmt.Errorf("rows affected: %w", err) } if rows == 0 { return domain.ErrOperationNotFound } return nil } // Complete marks an operation as completed or failed. func (r *OperationRepository) Complete(ctx context.Context, operationID string, status domain.OperationStatus, output map[string]any, errMsg, errDetail string) error { now := time.Now() // Get start time to calculate duration var startedAt time.Time err := r.db.QueryRowContext(ctx, `SELECT started_at FROM operations WHERE id = $1`, operationID).Scan(&startedAt) if errors.Is(err, sql.ErrNoRows) { return domain.ErrOperationNotFound } if err != nil { return fmt.Errorf("get started_at: %w", err) } durationMs := now.Sub(startedAt).Milliseconds() outputJSON, err := json.Marshal(output) if err != nil { return fmt.Errorf("marshal output: %w", err) } // Truncate error detail if needed errDetail = domain.TruncateErrorDetail(errDetail) res, err := r.db.ExecContext(ctx, ` UPDATE operations SET status = $2, completed_at = $3, duration_ms = $4, output = $5, error = $6, error_detail = $7 WHERE id = $1 `, operationID, string(status), now, durationMs, outputJSON, nullString(errMsg), nullString(errDetail)) if err != nil { return fmt.Errorf("complete operation: %w", err) } rows, err := res.RowsAffected() if err != nil { return fmt.Errorf("rows affected: %w", err) } if rows == 0 { return domain.ErrOperationNotFound } return nil } // SetCommitSHA updates the commit_sha field for an operation. func (r *OperationRepository) SetCommitSHA(ctx context.Context, operationID, sha string) error { res, err := r.db.ExecContext(ctx, ` UPDATE operations SET commit_sha = $2 WHERE id = $1 `, operationID, sha) if err != nil { return fmt.Errorf("set commit_sha: %w", err) } rows, err := res.RowsAffected() if err != nil { return fmt.Errorf("rows affected: %w", err) } if rows == 0 { return domain.ErrOperationNotFound } return nil } // SetTriggeredBy sets the triggered_by field to link to a parent operation. func (r *OperationRepository) SetTriggeredBy(ctx context.Context, operationID, parentID string) error { res, err := r.db.ExecContext(ctx, ` UPDATE operations SET triggered_by = $2 WHERE id = $1 `, operationID, parentID) if err != nil { return fmt.Errorf("set triggered_by: %w", err) } rows, err := res.RowsAffected() if err != nil { return fmt.Errorf("rows affected: %w", err) } if rows == 0 { return domain.ErrOperationNotFound } return nil } // DeleteOlderThan removes operations older than the specified time. func (r *OperationRepository) DeleteOlderThan(ctx context.Context, cutoff time.Time) (int64, error) { res, err := r.db.ExecContext(ctx, ` DELETE FROM operations WHERE started_at < $1 `, cutoff) if err != nil { return 0, fmt.Errorf("delete operations: %w", err) } return res.RowsAffected() } // scanOperation scans a single operation from a QueryRow result. func (r *OperationRepository) scanOperation(row *sql.Row) (*domain.Operation, error) { var op domain.Operation var opType, status string var requestID, triggeredBy, commitSHA, externalRef sql.NullString var completedAt sql.NullTime var durationMs sql.NullInt64 var inputJSON, outputJSON, stepsJSON []byte var errMsg, errDetail sql.NullString err := row.Scan( &op.ID, &op.ProjectID, &opType, &status, &requestID, &triggeredBy, &commitSHA, &externalRef, &op.StartedAt, &completedAt, &durationMs, &inputJSON, &outputJSON, &errMsg, &errDetail, &stepsJSON, &op.CreatedAt, ) if errors.Is(err, sql.ErrNoRows) { return nil, domain.ErrOperationNotFound } if err != nil { return nil, fmt.Errorf("scan operation: %w", err) } op.Type = domain.OperationType(opType) op.Status = domain.OperationStatus(status) if requestID.Valid { op.RequestID = requestID.String } if triggeredBy.Valid { op.TriggeredBy = triggeredBy.String } if commitSHA.Valid { op.CommitSHA = commitSHA.String } if externalRef.Valid { op.ExternalRef = externalRef.String } if completedAt.Valid { op.CompletedAt = &completedAt.Time } if durationMs.Valid { op.DurationMs = durationMs.Int64 } if errMsg.Valid { op.Error = errMsg.String } if errDetail.Valid { op.ErrorDetail = errDetail.String } if len(inputJSON) > 0 { if err := json.Unmarshal(inputJSON, &op.Input); err != nil { return nil, fmt.Errorf("unmarshal input: %w", err) } } if len(outputJSON) > 0 { if err := json.Unmarshal(outputJSON, &op.Output); err != nil { return nil, fmt.Errorf("unmarshal output: %w", err) } } if len(stepsJSON) > 0 { if err := json.Unmarshal(stepsJSON, &op.Steps); err != nil { return nil, fmt.Errorf("unmarshal steps: %w", err) } } return &op, nil } // scanOperationRows scans a single operation from a Rows result. func (r *OperationRepository) scanOperationRows(rows *sql.Rows) (*domain.Operation, error) { var op domain.Operation var opType, status string var requestID, triggeredBy, commitSHA, externalRef sql.NullString var completedAt sql.NullTime var durationMs sql.NullInt64 var inputJSON, outputJSON, stepsJSON []byte var errMsg, errDetail sql.NullString err := rows.Scan( &op.ID, &op.ProjectID, &opType, &status, &requestID, &triggeredBy, &commitSHA, &externalRef, &op.StartedAt, &completedAt, &durationMs, &inputJSON, &outputJSON, &errMsg, &errDetail, &stepsJSON, &op.CreatedAt, ) if err != nil { return nil, fmt.Errorf("scan operation: %w", err) } op.Type = domain.OperationType(opType) op.Status = domain.OperationStatus(status) if requestID.Valid { op.RequestID = requestID.String } if triggeredBy.Valid { op.TriggeredBy = triggeredBy.String } if commitSHA.Valid { op.CommitSHA = commitSHA.String } if externalRef.Valid { op.ExternalRef = externalRef.String } if completedAt.Valid { op.CompletedAt = &completedAt.Time } if durationMs.Valid { op.DurationMs = durationMs.Int64 } if errMsg.Valid { op.Error = errMsg.String } if errDetail.Valid { op.ErrorDetail = errDetail.String } if len(inputJSON) > 0 { if err := json.Unmarshal(inputJSON, &op.Input); err != nil { return nil, fmt.Errorf("unmarshal input: %w", err) } } if len(outputJSON) > 0 { if err := json.Unmarshal(outputJSON, &op.Output); err != nil { return nil, fmt.Errorf("unmarshal output: %w", err) } } if len(stepsJSON) > 0 { if err := json.Unmarshal(stepsJSON, &op.Steps); err != nil { return nil, fmt.Errorf("unmarshal steps: %w", err) } } return &op, nil } // nullInt64 converts an int64 to sql.NullInt64 (null if 0). func nullInt64(v int64) sql.NullInt64 { return sql.NullInt64{Int64: v, Valid: v != 0} } // nullTime converts a *time.Time to sql.NullTime. func nullTime(t *time.Time) sql.NullTime { if t == nil { return sql.NullTime{} } return sql.NullTime{Time: *t, Valid: true} }