// Package postgres provides PostgreSQL-based implementations of port interfaces. package postgres import ( "context" "database/sql" "errors" "fmt" "time" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" ) // CommandQueueRepository implements port.CommandQueue using PostgreSQL. type CommandQueueRepository struct { db *sql.DB } // NewCommandQueueRepository creates a new PostgreSQL command queue repository. func NewCommandQueueRepository(db *sql.DB) *CommandQueueRepository { return &CommandQueueRepository{db: db} } // Ensure CommandQueueRepository implements port.CommandQueue at compile time. var _ port.CommandQueue = (*CommandQueueRepository)(nil) // Enqueue adds a command to the queue. func (r *CommandQueueRepository) Enqueue(ctx context.Context, cmd *domain.QueuedCommand) error { var id string err := r.db.QueryRowContext(ctx, ` INSERT INTO command_queue (project_id, command, command_type, working_dir, status, priority, api_key_id) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id, created_at `, cmd.ProjectID, cmd.Command, string(cmd.CommandType), nullString(cmd.WorkingDir), string(cmd.Status), cmd.Priority, nullString(cmd.APIKeyID)).Scan(&id, &cmd.CreatedAt) if err != nil { return fmt.Errorf("enqueue command: %w", err) } cmd.ID = domain.QueuedCommandID(id) return nil } // Dequeue retrieves and locks the next pending command for a project. // Uses FOR UPDATE SKIP LOCKED for safe concurrent access. func (r *CommandQueueRepository) Dequeue(ctx context.Context, projectID string) (*domain.QueuedCommand, error) { // Use a transaction to atomically select and update tx, err := r.db.BeginTx(ctx, nil) if err != nil { return nil, fmt.Errorf("begin transaction: %w", err) } defer func() { _ = tx.Rollback() }() var cmd domain.QueuedCommand var id string var commandType string var status string var workingDir sql.NullString var apiKeyID sql.NullString // Select the highest priority pending command and lock it err = tx.QueryRowContext(ctx, ` SELECT id, project_id, command, command_type, working_dir, status, priority, created_at, api_key_id FROM command_queue WHERE project_id = $1 AND status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED `, projectID).Scan( &id, &cmd.ProjectID, &cmd.Command, &commandType, &workingDir, &status, &cmd.Priority, &cmd.CreatedAt, &apiKeyID, ) if errors.Is(err, sql.ErrNoRows) { return nil, nil // No pending commands } if err != nil { return nil, fmt.Errorf("select pending command: %w", err) } cmd.ID = domain.QueuedCommandID(id) cmd.CommandType = domain.CommandType(commandType) cmd.Status = domain.QueueStatus(status) if workingDir.Valid { cmd.WorkingDir = workingDir.String } if apiKeyID.Valid { cmd.APIKeyID = apiKeyID.String } // Update status to running now := time.Now() _, err = tx.ExecContext(ctx, ` UPDATE command_queue SET status = 'running', started_at = $1 WHERE id = $2 `, now, id) if err != nil { return nil, fmt.Errorf("update to running: %w", err) } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("commit transaction: %w", err) } cmd.Status = domain.QueueStatusRunning cmd.StartedAt = &now return &cmd, nil } // UpdateStatus updates the status of a queued command. func (r *CommandQueueRepository) UpdateStatus(ctx context.Context, cmdID domain.QueuedCommandID, status domain.QueueStatus, result *domain.QueuedCommandResult) error { var err error if result != nil { now := time.Now() _, err = r.db.ExecContext(ctx, ` UPDATE command_queue SET status = $1, completed_at = $2, result_exit_code = $3, result_output = $4, result_error = $5 WHERE id = $6 `, string(status), now, result.ExitCode, nullString(result.Output), nullString(result.Error), string(cmdID)) } else { _, err = r.db.ExecContext(ctx, ` UPDATE command_queue SET status = $1 WHERE id = $2 `, string(status), string(cmdID)) } if err != nil { return fmt.Errorf("update status: %w", err) } return nil } // GetByID retrieves a specific queued command by ID. func (r *CommandQueueRepository) GetByID(ctx context.Context, cmdID domain.QueuedCommandID) (*domain.QueuedCommand, error) { var cmd domain.QueuedCommand var id string var commandType string var status string var workingDir sql.NullString var startedAt sql.NullTime var completedAt sql.NullTime var exitCode sql.NullInt32 var output sql.NullString var resultError sql.NullString var apiKeyID sql.NullString err := r.db.QueryRowContext(ctx, ` SELECT id, project_id, command, command_type, working_dir, status, priority, created_at, started_at, completed_at, result_exit_code, result_output, result_error, api_key_id FROM command_queue WHERE id = $1 `, string(cmdID)).Scan( &id, &cmd.ProjectID, &cmd.Command, &commandType, &workingDir, &status, &cmd.Priority, &cmd.CreatedAt, &startedAt, &completedAt, &exitCode, &output, &resultError, &apiKeyID, ) if errors.Is(err, sql.ErrNoRows) { return nil, domain.ErrCommandNotFound } if err != nil { return nil, fmt.Errorf("get command: %w", err) } cmd.ID = domain.QueuedCommandID(id) cmd.CommandType = domain.CommandType(commandType) cmd.Status = domain.QueueStatus(status) if workingDir.Valid { cmd.WorkingDir = workingDir.String } if startedAt.Valid { cmd.StartedAt = &startedAt.Time } if completedAt.Valid { cmd.CompletedAt = &completedAt.Time } if exitCode.Valid { ec := int(exitCode.Int32) cmd.ExitCode = &ec } if output.Valid { cmd.Output = output.String } if resultError.Valid { cmd.Error = resultError.String } if apiKeyID.Valid { cmd.APIKeyID = apiKeyID.String } return &cmd, nil } // List returns queued commands for a project with optional filters. func (r *CommandQueueRepository) List(ctx context.Context, projectID string, filters *domain.QueueFilters) ([]*domain.QueuedCommand, error) { if filters == nil { filters = domain.DefaultQueueFilters() } // Build query with optional filters query := ` SELECT id, project_id, command, command_type, working_dir, status, priority, created_at, started_at, completed_at, result_exit_code, result_output, result_error, api_key_id FROM command_queue WHERE project_id = $1 ` args := []any{projectID} argNum := 2 if filters.Status != nil { query += fmt.Sprintf(" AND status = $%d", argNum) args = append(args, string(*filters.Status)) argNum++ } // Sort order if filters.SortOrder == "asc" { query += " ORDER BY created_at ASC" } else { query += " ORDER BY created_at DESC" } // Pagination query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argNum, argNum+1) args = append(args, filters.Limit, filters.Offset) rows, err := r.db.QueryContext(ctx, query, args...) if err != nil { return nil, fmt.Errorf("list commands: %w", err) } defer func() { _ = rows.Close() }() var commands []*domain.QueuedCommand for rows.Next() { var cmd domain.QueuedCommand var id string var commandType string var status string var workingDir sql.NullString var startedAt sql.NullTime var completedAt sql.NullTime var exitCode sql.NullInt32 var output sql.NullString var resultError sql.NullString var apiKeyID sql.NullString if err := rows.Scan( &id, &cmd.ProjectID, &cmd.Command, &commandType, &workingDir, &status, &cmd.Priority, &cmd.CreatedAt, &startedAt, &completedAt, &exitCode, &output, &resultError, &apiKeyID, ); err != nil { return nil, fmt.Errorf("scan command: %w", err) } cmd.ID = domain.QueuedCommandID(id) cmd.CommandType = domain.CommandType(commandType) cmd.Status = domain.QueueStatus(status) if workingDir.Valid { cmd.WorkingDir = workingDir.String } if startedAt.Valid { cmd.StartedAt = &startedAt.Time } if completedAt.Valid { cmd.CompletedAt = &completedAt.Time } if exitCode.Valid { ec := int(exitCode.Int32) cmd.ExitCode = &ec } if output.Valid { cmd.Output = output.String } if resultError.Valid { cmd.Error = resultError.String } if apiKeyID.Valid { cmd.APIKeyID = apiKeyID.String } commands = append(commands, &cmd) } return commands, nil } // Cancel marks a pending command as cancelled. func (r *CommandQueueRepository) Cancel(ctx context.Context, cmdID domain.QueuedCommandID) error { result, err := r.db.ExecContext(ctx, ` UPDATE command_queue SET status = 'cancelled', completed_at = NOW() WHERE id = $1 AND status = 'pending' `, string(cmdID)) if err != nil { return fmt.Errorf("cancel command: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("rows affected: %w", err) } if rows == 0 { // Check if command exists var exists bool err := r.db.QueryRowContext(ctx, `SELECT EXISTS(SELECT 1 FROM command_queue WHERE id = $1)`, string(cmdID)).Scan(&exists) if err != nil { return fmt.Errorf("check exists: %w", err) } if !exists { return domain.ErrCommandNotFound } // Command exists but not in pending state return fmt.Errorf("command is not in pending state") } return nil } // GetStats returns queue statistics for a project (or all projects if empty). func (r *CommandQueueRepository) GetStats(ctx context.Context, projectID string) (*domain.QueueStats, error) { var stats domain.QueueStats query := ` SELECT COUNT(*) FILTER (WHERE status = 'pending') as pending, COUNT(*) FILTER (WHERE status = 'running') as running, COUNT(*) FILTER (WHERE status = 'completed') as completed, COUNT(*) FILTER (WHERE status = 'failed') as failed, COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled FROM command_queue ` var err error if projectID != "" { query += " WHERE project_id = $1" err = r.db.QueryRowContext(ctx, query, projectID).Scan( &stats.TotalPending, &stats.TotalRunning, &stats.TotalCompleted, &stats.TotalFailed, &stats.TotalCancelled, ) } else { err = r.db.QueryRowContext(ctx, query).Scan( &stats.TotalPending, &stats.TotalRunning, &stats.TotalCompleted, &stats.TotalFailed, &stats.TotalCancelled, ) } if err != nil { return nil, fmt.Errorf("get stats: %w", err) } return &stats, nil } // CleanupOld removes completed/failed/cancelled commands older than the specified duration. func (r *CommandQueueRepository) CleanupOld(ctx context.Context, olderThanDays int) (int64, error) { result, err := r.db.ExecContext(ctx, ` DELETE FROM command_queue WHERE status IN ('completed', 'failed', 'cancelled') AND completed_at < NOW() - INTERVAL '1 day' * $1 `, olderThanDays) if err != nil { return 0, fmt.Errorf("cleanup old commands: %w", err) } return result.RowsAffected() } // nullString returns a sql.NullString for optional string fields. func nullString(s string) sql.NullString { if s == "" { return sql.NullString{} } return sql.NullString{String: s, Valid: true} }