rdev/internal/adapter/postgres/command_queue.go
jordan 72d16929ca feat: Implement hexagonal architecture with services, webhooks, queue, and telemetry
Major refactoring to hexagonal (ports & adapters) architecture:

- Add service layer (apikey_service, project_service) for business logic
- Add webhook system with dispatcher and delivery tracking
- Add command queue with priority-based processing
- Add rate limiting with sliding window algorithm
- Add audit logging for command execution
- Add OpenTelemetry integration (traces, metrics, spans)
- Add circuit breaker for fault tolerance
- Add cached repository wrapper for performance
- Add comprehensive validation package
- Add Kubernetes client integration for pod management
- Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks)
- Add network policy and PodDisruptionBudget for k8s
- Remove legacy executor and projects/registry packages
- Untrack secrets.yaml (now managed via envault)
- Add coverage.out to .gitignore
- Add e2e test infrastructure with docker-compose
- Add comprehensive documentation (API, architecture, operations, plans)
- Add golangci-lint config and pre-commit hook

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 19:57:46 -07:00

418 lines
11 KiB
Go

// Package postgres provides PostgreSQL-based implementations of port interfaces.
package postgres
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
)
// CommandQueueRepository implements port.CommandQueue using PostgreSQL.
type CommandQueueRepository struct {
db *sql.DB
}
// NewCommandQueueRepository creates a new PostgreSQL command queue repository.
func NewCommandQueueRepository(db *sql.DB) *CommandQueueRepository {
return &CommandQueueRepository{db: db}
}
// Ensure CommandQueueRepository implements port.CommandQueue at compile time.
var _ port.CommandQueue = (*CommandQueueRepository)(nil)
// Enqueue adds a command to the queue.
func (r *CommandQueueRepository) Enqueue(ctx context.Context, cmd *domain.QueuedCommand) error {
var id string
err := r.db.QueryRowContext(ctx, `
INSERT INTO command_queue (project_id, command, command_type, working_dir, status, priority, api_key_id)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id, created_at
`, cmd.ProjectID, cmd.Command, string(cmd.CommandType), nullString(cmd.WorkingDir),
string(cmd.Status), cmd.Priority, nullString(cmd.APIKeyID)).Scan(&id, &cmd.CreatedAt)
if err != nil {
return fmt.Errorf("enqueue command: %w", err)
}
cmd.ID = domain.QueuedCommandID(id)
return nil
}
// Dequeue retrieves and locks the next pending command for a project.
// Uses FOR UPDATE SKIP LOCKED for safe concurrent access.
func (r *CommandQueueRepository) Dequeue(ctx context.Context, projectID string) (*domain.QueuedCommand, error) {
// Use a transaction to atomically select and update
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("begin transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
var cmd domain.QueuedCommand
var id string
var commandType string
var status string
var workingDir sql.NullString
var apiKeyID sql.NullString
// Select the highest priority pending command and lock it
err = tx.QueryRowContext(ctx, `
SELECT id, project_id, command, command_type, working_dir, status, priority, created_at, api_key_id
FROM command_queue
WHERE project_id = $1 AND status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
`, projectID).Scan(
&id,
&cmd.ProjectID,
&cmd.Command,
&commandType,
&workingDir,
&status,
&cmd.Priority,
&cmd.CreatedAt,
&apiKeyID,
)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil // No pending commands
}
if err != nil {
return nil, fmt.Errorf("select pending command: %w", err)
}
cmd.ID = domain.QueuedCommandID(id)
cmd.CommandType = domain.CommandType(commandType)
cmd.Status = domain.QueueStatus(status)
if workingDir.Valid {
cmd.WorkingDir = workingDir.String
}
if apiKeyID.Valid {
cmd.APIKeyID = apiKeyID.String
}
// Update status to running
now := time.Now()
_, err = tx.ExecContext(ctx, `
UPDATE command_queue
SET status = 'running', started_at = $1
WHERE id = $2
`, now, id)
if err != nil {
return nil, fmt.Errorf("update to running: %w", err)
}
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("commit transaction: %w", err)
}
cmd.Status = domain.QueueStatusRunning
cmd.StartedAt = &now
return &cmd, nil
}
// UpdateStatus updates the status of a queued command.
func (r *CommandQueueRepository) UpdateStatus(ctx context.Context, cmdID domain.QueuedCommandID, status domain.QueueStatus, result *domain.QueuedCommandResult) error {
var err error
if result != nil {
now := time.Now()
_, err = r.db.ExecContext(ctx, `
UPDATE command_queue
SET status = $1, completed_at = $2, result_exit_code = $3, result_output = $4, result_error = $5
WHERE id = $6
`, string(status), now, result.ExitCode, nullString(result.Output), nullString(result.Error), string(cmdID))
} else {
_, err = r.db.ExecContext(ctx, `
UPDATE command_queue
SET status = $1
WHERE id = $2
`, string(status), string(cmdID))
}
if err != nil {
return fmt.Errorf("update status: %w", err)
}
return nil
}
// GetByID retrieves a specific queued command by ID.
func (r *CommandQueueRepository) GetByID(ctx context.Context, cmdID domain.QueuedCommandID) (*domain.QueuedCommand, error) {
var cmd domain.QueuedCommand
var id string
var commandType string
var status string
var workingDir sql.NullString
var startedAt sql.NullTime
var completedAt sql.NullTime
var exitCode sql.NullInt32
var output sql.NullString
var resultError sql.NullString
var apiKeyID sql.NullString
err := r.db.QueryRowContext(ctx, `
SELECT id, project_id, command, command_type, working_dir, status, priority,
created_at, started_at, completed_at, result_exit_code, result_output, result_error, api_key_id
FROM command_queue
WHERE id = $1
`, string(cmdID)).Scan(
&id,
&cmd.ProjectID,
&cmd.Command,
&commandType,
&workingDir,
&status,
&cmd.Priority,
&cmd.CreatedAt,
&startedAt,
&completedAt,
&exitCode,
&output,
&resultError,
&apiKeyID,
)
if errors.Is(err, sql.ErrNoRows) {
return nil, domain.ErrCommandNotFound
}
if err != nil {
return nil, fmt.Errorf("get command: %w", err)
}
cmd.ID = domain.QueuedCommandID(id)
cmd.CommandType = domain.CommandType(commandType)
cmd.Status = domain.QueueStatus(status)
if workingDir.Valid {
cmd.WorkingDir = workingDir.String
}
if startedAt.Valid {
cmd.StartedAt = &startedAt.Time
}
if completedAt.Valid {
cmd.CompletedAt = &completedAt.Time
}
if exitCode.Valid {
ec := int(exitCode.Int32)
cmd.ExitCode = &ec
}
if output.Valid {
cmd.Output = output.String
}
if resultError.Valid {
cmd.Error = resultError.String
}
if apiKeyID.Valid {
cmd.APIKeyID = apiKeyID.String
}
return &cmd, nil
}
// List returns queued commands for a project with optional filters.
func (r *CommandQueueRepository) List(ctx context.Context, projectID string, filters *domain.QueueFilters) ([]*domain.QueuedCommand, error) {
if filters == nil {
filters = domain.DefaultQueueFilters()
}
// Build query with optional filters
query := `
SELECT id, project_id, command, command_type, working_dir, status, priority,
created_at, started_at, completed_at, result_exit_code, result_output, result_error, api_key_id
FROM command_queue
WHERE project_id = $1
`
args := []any{projectID}
argNum := 2
if filters.Status != nil {
query += fmt.Sprintf(" AND status = $%d", argNum)
args = append(args, string(*filters.Status))
argNum++
}
// Sort order
if filters.SortOrder == "asc" {
query += " ORDER BY created_at ASC"
} else {
query += " ORDER BY created_at DESC"
}
// Pagination
query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argNum, argNum+1)
args = append(args, filters.Limit, filters.Offset)
rows, err := r.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list commands: %w", err)
}
defer func() { _ = rows.Close() }()
var commands []*domain.QueuedCommand
for rows.Next() {
var cmd domain.QueuedCommand
var id string
var commandType string
var status string
var workingDir sql.NullString
var startedAt sql.NullTime
var completedAt sql.NullTime
var exitCode sql.NullInt32
var output sql.NullString
var resultError sql.NullString
var apiKeyID sql.NullString
if err := rows.Scan(
&id,
&cmd.ProjectID,
&cmd.Command,
&commandType,
&workingDir,
&status,
&cmd.Priority,
&cmd.CreatedAt,
&startedAt,
&completedAt,
&exitCode,
&output,
&resultError,
&apiKeyID,
); err != nil {
return nil, fmt.Errorf("scan command: %w", err)
}
cmd.ID = domain.QueuedCommandID(id)
cmd.CommandType = domain.CommandType(commandType)
cmd.Status = domain.QueueStatus(status)
if workingDir.Valid {
cmd.WorkingDir = workingDir.String
}
if startedAt.Valid {
cmd.StartedAt = &startedAt.Time
}
if completedAt.Valid {
cmd.CompletedAt = &completedAt.Time
}
if exitCode.Valid {
ec := int(exitCode.Int32)
cmd.ExitCode = &ec
}
if output.Valid {
cmd.Output = output.String
}
if resultError.Valid {
cmd.Error = resultError.String
}
if apiKeyID.Valid {
cmd.APIKeyID = apiKeyID.String
}
commands = append(commands, &cmd)
}
return commands, nil
}
// Cancel marks a pending command as cancelled.
func (r *CommandQueueRepository) Cancel(ctx context.Context, cmdID domain.QueuedCommandID) error {
result, err := r.db.ExecContext(ctx, `
UPDATE command_queue
SET status = 'cancelled', completed_at = NOW()
WHERE id = $1 AND status = 'pending'
`, string(cmdID))
if err != nil {
return fmt.Errorf("cancel command: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if rows == 0 {
// Check if command exists
var exists bool
err := r.db.QueryRowContext(ctx, `SELECT EXISTS(SELECT 1 FROM command_queue WHERE id = $1)`, string(cmdID)).Scan(&exists)
if err != nil {
return fmt.Errorf("check exists: %w", err)
}
if !exists {
return domain.ErrCommandNotFound
}
// Command exists but not in pending state
return fmt.Errorf("command is not in pending state")
}
return nil
}
// GetStats returns queue statistics for a project (or all projects if empty).
func (r *CommandQueueRepository) GetStats(ctx context.Context, projectID string) (*domain.QueueStats, error) {
var stats domain.QueueStats
query := `
SELECT
COUNT(*) FILTER (WHERE status = 'pending') as pending,
COUNT(*) FILTER (WHERE status = 'running') as running,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed') as failed,
COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled
FROM command_queue
`
var err error
if projectID != "" {
query += " WHERE project_id = $1"
err = r.db.QueryRowContext(ctx, query, projectID).Scan(
&stats.TotalPending,
&stats.TotalRunning,
&stats.TotalCompleted,
&stats.TotalFailed,
&stats.TotalCancelled,
)
} else {
err = r.db.QueryRowContext(ctx, query).Scan(
&stats.TotalPending,
&stats.TotalRunning,
&stats.TotalCompleted,
&stats.TotalFailed,
&stats.TotalCancelled,
)
}
if err != nil {
return nil, fmt.Errorf("get stats: %w", err)
}
return &stats, nil
}
// CleanupOld removes completed/failed/cancelled commands older than the specified duration.
func (r *CommandQueueRepository) CleanupOld(ctx context.Context, olderThanDays int) (int64, error) {
result, err := r.db.ExecContext(ctx, `
DELETE FROM command_queue
WHERE status IN ('completed', 'failed', 'cancelled')
AND completed_at < NOW() - INTERVAL '1 day' * $1
`, olderThanDays)
if err != nil {
return 0, fmt.Errorf("cleanup old commands: %w", err)
}
return result.RowsAffected()
}
// nullString returns a sql.NullString for optional string fields.
func nullString(s string) sql.NullString {
if s == "" {
return sql.NullString{}
}
return sql.NullString{String: s, Valid: true}
}