Operations Audit (new feature): - Add Operation domain model with status tracking (pending, running, completed, failed, cancelled) - Add OperationRepository with PostgreSQL implementation - Add OperationService for CRUD and lifecycle management - Add operations handlers (list, get, cancel endpoints) - Add migration 015_operations.sql for operations table - Add operation cleanup worker for stale operation handling - Add ErrOperationNotFound to domain errors Template Improvements: - Add CLAUDE.md configuration files to astro-landing, default, and go-api templates - Fix PORT template variable usage in nginx configs for app templates - Add replace directives for local pkg module in Go templates - Simplify Go service/worker Dockerfiles for workspace builds - Fix TypeScript error in logger template Other: - Refactor landing-test.sh cookbook script - Update CLAUDE.md version reference Note: Some files exceed 500-line limit (pre-existing debt + new feature) - component.go: 550 lines (unchanged, pre-existing) - main.go: 522 lines (added operations wiring) - operation_repo.go: 569 lines (new, needs splitting) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
570 lines
14 KiB
Go
570 lines
14 KiB
Go
// Package postgres provides PostgreSQL-based implementations of port interfaces.
|
|
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// OperationRepository implements port.OperationRepository using PostgreSQL.
|
|
type OperationRepository struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// NewOperationRepository creates a new PostgreSQL operation repository.
|
|
func NewOperationRepository(db *sql.DB) *OperationRepository {
|
|
return &OperationRepository{db: db}
|
|
}
|
|
|
|
// Ensure OperationRepository implements port.OperationRepository at compile time.
|
|
var _ port.OperationRepository = (*OperationRepository)(nil)
|
|
|
|
// Create creates a new operation record.
|
|
func (r *OperationRepository) Create(ctx context.Context, op *domain.Operation) error {
|
|
inputJSON, err := json.Marshal(op.Input)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal input: %w", err)
|
|
}
|
|
|
|
stepsJSON, err := json.Marshal(op.Steps)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal steps: %w", err)
|
|
}
|
|
|
|
_, err = r.db.ExecContext(ctx, `
|
|
INSERT INTO operations (
|
|
id, project_id, type, status, request_id, triggered_by,
|
|
commit_sha, external_ref, started_at, input, steps
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
|
`,
|
|
op.ID,
|
|
op.ProjectID,
|
|
string(op.Type),
|
|
string(op.Status),
|
|
nullString(op.RequestID),
|
|
nullString(op.TriggeredBy),
|
|
nullString(op.CommitSHA),
|
|
nullString(op.ExternalRef),
|
|
op.StartedAt,
|
|
inputJSON,
|
|
stepsJSON,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("insert operation: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Update updates an existing operation record.
|
|
func (r *OperationRepository) Update(ctx context.Context, op *domain.Operation) error {
|
|
inputJSON, err := json.Marshal(op.Input)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal input: %w", err)
|
|
}
|
|
|
|
outputJSON, err := json.Marshal(op.Output)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal output: %w", err)
|
|
}
|
|
|
|
stepsJSON, err := json.Marshal(op.Steps)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal steps: %w", err)
|
|
}
|
|
|
|
res, err := r.db.ExecContext(ctx, `
|
|
UPDATE operations SET
|
|
status = $2,
|
|
request_id = $3,
|
|
triggered_by = $4,
|
|
commit_sha = $5,
|
|
external_ref = $6,
|
|
completed_at = $7,
|
|
duration_ms = $8,
|
|
input = $9,
|
|
output = $10,
|
|
error = $11,
|
|
error_detail = $12,
|
|
steps = $13
|
|
WHERE id = $1
|
|
`,
|
|
op.ID,
|
|
string(op.Status),
|
|
nullString(op.RequestID),
|
|
nullString(op.TriggeredBy),
|
|
nullString(op.CommitSHA),
|
|
nullString(op.ExternalRef),
|
|
nullTime(op.CompletedAt),
|
|
nullInt64(op.DurationMs),
|
|
inputJSON,
|
|
outputJSON,
|
|
nullString(op.Error),
|
|
nullString(op.ErrorDetail),
|
|
stepsJSON,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("update operation: %w", err)
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("rows affected: %w", err)
|
|
}
|
|
if rows == 0 {
|
|
return domain.ErrOperationNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Get retrieves an operation by ID.
|
|
func (r *OperationRepository) Get(ctx context.Context, id string) (*domain.Operation, error) {
|
|
row := r.db.QueryRowContext(ctx, `
|
|
SELECT id, project_id, type, status, request_id, triggered_by,
|
|
commit_sha, external_ref, started_at, completed_at, duration_ms,
|
|
input, output, error, error_detail, steps, created_at
|
|
FROM operations
|
|
WHERE id = $1
|
|
`, id)
|
|
|
|
return r.scanOperation(row)
|
|
}
|
|
|
|
// GetByCommitSHA finds the operation that created a specific commit.
|
|
func (r *OperationRepository) GetByCommitSHA(ctx context.Context, projectID, sha string) (*domain.Operation, error) {
|
|
row := r.db.QueryRowContext(ctx, `
|
|
SELECT id, project_id, type, status, request_id, triggered_by,
|
|
commit_sha, external_ref, started_at, completed_at, duration_ms,
|
|
input, output, error, error_detail, steps, created_at
|
|
FROM operations
|
|
WHERE project_id = $1 AND commit_sha = $2
|
|
ORDER BY started_at DESC
|
|
LIMIT 1
|
|
`, projectID, sha)
|
|
|
|
return r.scanOperation(row)
|
|
}
|
|
|
|
// List returns operations matching the filter criteria.
|
|
func (r *OperationRepository) List(ctx context.Context, filter domain.OperationFilters) ([]*domain.Operation, error) {
|
|
filter.Normalize()
|
|
|
|
query := strings.Builder{}
|
|
query.WriteString(`
|
|
SELECT id, project_id, type, status, request_id, triggered_by,
|
|
commit_sha, external_ref, started_at, completed_at, duration_ms,
|
|
input, output, error, error_detail, steps, created_at
|
|
FROM operations
|
|
WHERE project_id = $1
|
|
`)
|
|
|
|
args := []any{filter.ProjectID}
|
|
argNum := 2
|
|
|
|
if filter.Type != "" {
|
|
fmt.Fprintf(&query, " AND type = $%d", argNum)
|
|
args = append(args, string(filter.Type))
|
|
argNum++
|
|
}
|
|
|
|
if filter.Status != "" {
|
|
fmt.Fprintf(&query, " AND status = $%d", argNum)
|
|
args = append(args, string(filter.Status))
|
|
argNum++
|
|
}
|
|
|
|
if filter.CommitSHA != "" {
|
|
fmt.Fprintf(&query, " AND commit_sha = $%d", argNum)
|
|
args = append(args, filter.CommitSHA)
|
|
argNum++
|
|
}
|
|
|
|
if !filter.Since.IsZero() {
|
|
fmt.Fprintf(&query, " AND started_at >= $%d", argNum)
|
|
args = append(args, filter.Since)
|
|
argNum++
|
|
}
|
|
|
|
query.WriteString(" ORDER BY started_at DESC")
|
|
|
|
fmt.Fprintf(&query, " LIMIT $%d", argNum)
|
|
args = append(args, filter.Limit)
|
|
|
|
rows, err := r.db.QueryContext(ctx, query.String(), args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query operations: %w", err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
|
|
var operations []*domain.Operation
|
|
for rows.Next() {
|
|
op, err := r.scanOperationRows(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
operations = append(operations, op)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate operations: %w", err)
|
|
}
|
|
|
|
return operations, nil
|
|
}
|
|
|
|
// AddStep appends a new step to an operation.
|
|
func (r *OperationRepository) AddStep(ctx context.Context, operationID string, step domain.OperationStep) error {
|
|
stepJSON, err := json.Marshal(step)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal step: %w", err)
|
|
}
|
|
|
|
// Use JSONB array concatenation to append the step
|
|
res, err := r.db.ExecContext(ctx, `
|
|
UPDATE operations
|
|
SET steps = steps || $2::jsonb
|
|
WHERE id = $1
|
|
`, operationID, stepJSON)
|
|
if err != nil {
|
|
return fmt.Errorf("add step: %w", err)
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("rows affected: %w", err)
|
|
}
|
|
if rows == 0 {
|
|
return domain.ErrOperationNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateStep updates an existing step within an operation.
|
|
func (r *OperationRepository) UpdateStep(ctx context.Context, operationID string, step domain.OperationStep) error {
|
|
// Get current steps
|
|
op, err := r.Get(ctx, operationID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Find and update the step by name
|
|
found := false
|
|
for i := range op.Steps {
|
|
if op.Steps[i].Name == step.Name {
|
|
op.Steps[i] = step
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
return fmt.Errorf("step %q not found", step.Name)
|
|
}
|
|
|
|
stepsJSON, err := json.Marshal(op.Steps)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal steps: %w", err)
|
|
}
|
|
|
|
res, err := r.db.ExecContext(ctx, `
|
|
UPDATE operations
|
|
SET steps = $2
|
|
WHERE id = $1
|
|
`, operationID, stepsJSON)
|
|
if err != nil {
|
|
return fmt.Errorf("update step: %w", err)
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("rows affected: %w", err)
|
|
}
|
|
if rows == 0 {
|
|
return domain.ErrOperationNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Complete marks an operation as completed or failed.
|
|
func (r *OperationRepository) Complete(ctx context.Context, operationID string, status domain.OperationStatus, output map[string]any, errMsg, errDetail string) error {
|
|
now := time.Now()
|
|
|
|
// Get start time to calculate duration
|
|
var startedAt time.Time
|
|
err := r.db.QueryRowContext(ctx, `SELECT started_at FROM operations WHERE id = $1`, operationID).Scan(&startedAt)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return domain.ErrOperationNotFound
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("get started_at: %w", err)
|
|
}
|
|
|
|
durationMs := now.Sub(startedAt).Milliseconds()
|
|
|
|
outputJSON, err := json.Marshal(output)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal output: %w", err)
|
|
}
|
|
|
|
// Truncate error detail if needed
|
|
errDetail = domain.TruncateErrorDetail(errDetail)
|
|
|
|
res, err := r.db.ExecContext(ctx, `
|
|
UPDATE operations
|
|
SET status = $2, completed_at = $3, duration_ms = $4, output = $5, error = $6, error_detail = $7
|
|
WHERE id = $1
|
|
`, operationID, string(status), now, durationMs, outputJSON, nullString(errMsg), nullString(errDetail))
|
|
if err != nil {
|
|
return fmt.Errorf("complete operation: %w", err)
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("rows affected: %w", err)
|
|
}
|
|
if rows == 0 {
|
|
return domain.ErrOperationNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetCommitSHA updates the commit_sha field for an operation.
|
|
func (r *OperationRepository) SetCommitSHA(ctx context.Context, operationID, sha string) error {
|
|
res, err := r.db.ExecContext(ctx, `
|
|
UPDATE operations SET commit_sha = $2 WHERE id = $1
|
|
`, operationID, sha)
|
|
if err != nil {
|
|
return fmt.Errorf("set commit_sha: %w", err)
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("rows affected: %w", err)
|
|
}
|
|
if rows == 0 {
|
|
return domain.ErrOperationNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetTriggeredBy sets the triggered_by field to link to a parent operation.
|
|
func (r *OperationRepository) SetTriggeredBy(ctx context.Context, operationID, parentID string) error {
|
|
res, err := r.db.ExecContext(ctx, `
|
|
UPDATE operations SET triggered_by = $2 WHERE id = $1
|
|
`, operationID, parentID)
|
|
if err != nil {
|
|
return fmt.Errorf("set triggered_by: %w", err)
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("rows affected: %w", err)
|
|
}
|
|
if rows == 0 {
|
|
return domain.ErrOperationNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteOlderThan removes operations older than the specified time.
|
|
func (r *OperationRepository) DeleteOlderThan(ctx context.Context, cutoff time.Time) (int64, error) {
|
|
res, err := r.db.ExecContext(ctx, `
|
|
DELETE FROM operations WHERE started_at < $1
|
|
`, cutoff)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("delete operations: %w", err)
|
|
}
|
|
|
|
return res.RowsAffected()
|
|
}
|
|
|
|
// scanOperation scans a single operation from a QueryRow result.
|
|
func (r *OperationRepository) scanOperation(row *sql.Row) (*domain.Operation, error) {
|
|
var op domain.Operation
|
|
var opType, status string
|
|
var requestID, triggeredBy, commitSHA, externalRef sql.NullString
|
|
var completedAt sql.NullTime
|
|
var durationMs sql.NullInt64
|
|
var inputJSON, outputJSON, stepsJSON []byte
|
|
var errMsg, errDetail sql.NullString
|
|
|
|
err := row.Scan(
|
|
&op.ID,
|
|
&op.ProjectID,
|
|
&opType,
|
|
&status,
|
|
&requestID,
|
|
&triggeredBy,
|
|
&commitSHA,
|
|
&externalRef,
|
|
&op.StartedAt,
|
|
&completedAt,
|
|
&durationMs,
|
|
&inputJSON,
|
|
&outputJSON,
|
|
&errMsg,
|
|
&errDetail,
|
|
&stepsJSON,
|
|
&op.CreatedAt,
|
|
)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, domain.ErrOperationNotFound
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scan operation: %w", err)
|
|
}
|
|
|
|
op.Type = domain.OperationType(opType)
|
|
op.Status = domain.OperationStatus(status)
|
|
|
|
if requestID.Valid {
|
|
op.RequestID = requestID.String
|
|
}
|
|
if triggeredBy.Valid {
|
|
op.TriggeredBy = triggeredBy.String
|
|
}
|
|
if commitSHA.Valid {
|
|
op.CommitSHA = commitSHA.String
|
|
}
|
|
if externalRef.Valid {
|
|
op.ExternalRef = externalRef.String
|
|
}
|
|
if completedAt.Valid {
|
|
op.CompletedAt = &completedAt.Time
|
|
}
|
|
if durationMs.Valid {
|
|
op.DurationMs = durationMs.Int64
|
|
}
|
|
if errMsg.Valid {
|
|
op.Error = errMsg.String
|
|
}
|
|
if errDetail.Valid {
|
|
op.ErrorDetail = errDetail.String
|
|
}
|
|
|
|
if len(inputJSON) > 0 {
|
|
if err := json.Unmarshal(inputJSON, &op.Input); err != nil {
|
|
return nil, fmt.Errorf("unmarshal input: %w", err)
|
|
}
|
|
}
|
|
if len(outputJSON) > 0 {
|
|
if err := json.Unmarshal(outputJSON, &op.Output); err != nil {
|
|
return nil, fmt.Errorf("unmarshal output: %w", err)
|
|
}
|
|
}
|
|
if len(stepsJSON) > 0 {
|
|
if err := json.Unmarshal(stepsJSON, &op.Steps); err != nil {
|
|
return nil, fmt.Errorf("unmarshal steps: %w", err)
|
|
}
|
|
}
|
|
|
|
return &op, nil
|
|
}
|
|
|
|
// scanOperationRows scans a single operation from a Rows result.
|
|
func (r *OperationRepository) scanOperationRows(rows *sql.Rows) (*domain.Operation, error) {
|
|
var op domain.Operation
|
|
var opType, status string
|
|
var requestID, triggeredBy, commitSHA, externalRef sql.NullString
|
|
var completedAt sql.NullTime
|
|
var durationMs sql.NullInt64
|
|
var inputJSON, outputJSON, stepsJSON []byte
|
|
var errMsg, errDetail sql.NullString
|
|
|
|
err := rows.Scan(
|
|
&op.ID,
|
|
&op.ProjectID,
|
|
&opType,
|
|
&status,
|
|
&requestID,
|
|
&triggeredBy,
|
|
&commitSHA,
|
|
&externalRef,
|
|
&op.StartedAt,
|
|
&completedAt,
|
|
&durationMs,
|
|
&inputJSON,
|
|
&outputJSON,
|
|
&errMsg,
|
|
&errDetail,
|
|
&stepsJSON,
|
|
&op.CreatedAt,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scan operation: %w", err)
|
|
}
|
|
|
|
op.Type = domain.OperationType(opType)
|
|
op.Status = domain.OperationStatus(status)
|
|
|
|
if requestID.Valid {
|
|
op.RequestID = requestID.String
|
|
}
|
|
if triggeredBy.Valid {
|
|
op.TriggeredBy = triggeredBy.String
|
|
}
|
|
if commitSHA.Valid {
|
|
op.CommitSHA = commitSHA.String
|
|
}
|
|
if externalRef.Valid {
|
|
op.ExternalRef = externalRef.String
|
|
}
|
|
if completedAt.Valid {
|
|
op.CompletedAt = &completedAt.Time
|
|
}
|
|
if durationMs.Valid {
|
|
op.DurationMs = durationMs.Int64
|
|
}
|
|
if errMsg.Valid {
|
|
op.Error = errMsg.String
|
|
}
|
|
if errDetail.Valid {
|
|
op.ErrorDetail = errDetail.String
|
|
}
|
|
|
|
if len(inputJSON) > 0 {
|
|
if err := json.Unmarshal(inputJSON, &op.Input); err != nil {
|
|
return nil, fmt.Errorf("unmarshal input: %w", err)
|
|
}
|
|
}
|
|
if len(outputJSON) > 0 {
|
|
if err := json.Unmarshal(outputJSON, &op.Output); err != nil {
|
|
return nil, fmt.Errorf("unmarshal output: %w", err)
|
|
}
|
|
}
|
|
if len(stepsJSON) > 0 {
|
|
if err := json.Unmarshal(stepsJSON, &op.Steps); err != nil {
|
|
return nil, fmt.Errorf("unmarshal steps: %w", err)
|
|
}
|
|
}
|
|
|
|
return &op, nil
|
|
}
|
|
|
|
// nullInt64 converts an int64 to sql.NullInt64 (null if 0).
|
|
func nullInt64(v int64) sql.NullInt64 {
|
|
return sql.NullInt64{Int64: v, Valid: v != 0}
|
|
}
|
|
|
|
// nullTime converts a *time.Time to sql.NullTime.
|
|
func nullTime(t *time.Time) sql.NullTime {
|
|
if t == nil {
|
|
return sql.NullTime{}
|
|
}
|
|
return sql.NullTime{Time: *t, Valid: true}
|
|
}
|