rdev/internal/adapter/postgres/command_queue_test.go
jordan 72d16929ca feat: Implement hexagonal architecture with services, webhooks, queue, and telemetry
Major refactoring to hexagonal (ports & adapters) architecture:

- Add service layer (apikey_service, project_service) for business logic
- Add webhook system with dispatcher and delivery tracking
- Add command queue with priority-based processing
- Add rate limiting with sliding window algorithm
- Add audit logging for command execution
- Add OpenTelemetry integration (traces, metrics, spans)
- Add circuit breaker for fault tolerance
- Add cached repository wrapper for performance
- Add comprehensive validation package
- Add Kubernetes client integration for pod management
- Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks)
- Add network policy and PodDisruptionBudget for k8s
- Remove legacy executor and projects/registry packages
- Untrack secrets.yaml (now managed via envault)
- Add coverage.out to .gitignore
- Add e2e test infrastructure with docker-compose
- Add comprehensive documentation (API, architecture, operations, plans)
- Add golangci-lint config and pre-commit hook

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 19:57:46 -07:00

488 lines
13 KiB
Go

package postgres
import (
"context"
"database/sql"
"testing"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/testutil"
)
func cleanupTestQueue(t *testing.T, db *sql.DB) {
t.Helper()
_, err := db.Exec("DELETE FROM command_queue WHERE project_id LIKE 'test-%'")
if err != nil {
t.Logf("cleanup test queue: %v", err)
}
}
func TestCommandQueueRepository_Enqueue(t *testing.T) {
db := testutil.TestDB(t)
t.Cleanup(func() { cleanupTestQueue(t, db) })
repo := NewCommandQueueRepository(db)
ctx := context.Background()
t.Run("enqueues command successfully", func(t *testing.T) {
cmd := &domain.QueuedCommand{
ProjectID: "test-proj-enqueue-1",
Command: "explain this code",
CommandType: domain.CommandTypeClaude,
Status: domain.QueueStatusPending,
Priority: 0,
APIKeyID: "key-1",
}
err := repo.Enqueue(ctx, cmd)
if err != nil {
t.Fatalf("Enqueue() error = %v", err)
}
if cmd.ID == "" {
t.Error("ID should be set after enqueue")
}
if cmd.CreatedAt.IsZero() {
t.Error("CreatedAt should be set after enqueue")
}
})
t.Run("enqueues with working directory", func(t *testing.T) {
cmd := &domain.QueuedCommand{
ProjectID: "test-proj-enqueue-2",
Command: "ls -la",
CommandType: domain.CommandTypeShell,
WorkingDir: "/tmp",
Status: domain.QueueStatusPending,
Priority: 1,
}
err := repo.Enqueue(ctx, cmd)
if err != nil {
t.Fatalf("Enqueue() error = %v", err)
}
// Retrieve and verify
retrieved, err := repo.GetByID(ctx, cmd.ID)
if err != nil {
t.Fatalf("GetByID() error = %v", err)
}
if retrieved.WorkingDir != "/tmp" {
t.Errorf("WorkingDir = %q, want %q", retrieved.WorkingDir, "/tmp")
}
})
}
func TestCommandQueueRepository_Dequeue(t *testing.T) {
db := testutil.TestDB(t)
t.Cleanup(func() { cleanupTestQueue(t, db) })
repo := NewCommandQueueRepository(db)
ctx := context.Background()
t.Run("dequeues pending command", func(t *testing.T) {
// Create a pending command
cmd := &domain.QueuedCommand{
ProjectID: "test-proj-dequeue-1",
Command: "test command",
CommandType: domain.CommandTypeClaude,
Status: domain.QueueStatusPending,
Priority: 0,
}
_ = repo.Enqueue(ctx, cmd)
// Dequeue it
dequeued, err := repo.Dequeue(ctx, "test-proj-dequeue-1")
if err != nil {
t.Fatalf("Dequeue() error = %v", err)
}
if dequeued == nil {
t.Fatal("Dequeue() returned nil")
}
if dequeued.Status != domain.QueueStatusRunning {
t.Errorf("Status = %q, want %q", dequeued.Status, domain.QueueStatusRunning)
}
if dequeued.StartedAt == nil {
t.Error("StartedAt should be set after dequeue")
}
})
t.Run("returns nil when no pending commands", func(t *testing.T) {
dequeued, err := repo.Dequeue(ctx, "test-proj-dequeue-empty")
if err != nil {
t.Fatalf("Dequeue() error = %v", err)
}
if dequeued != nil {
t.Error("Dequeue() should return nil when no pending commands")
}
})
t.Run("dequeues highest priority first", func(t *testing.T) {
projectID := "test-proj-dequeue-priority"
// Create commands with different priorities
low := &domain.QueuedCommand{
ProjectID: projectID,
Command: "low priority",
CommandType: domain.CommandTypeClaude,
Status: domain.QueueStatusPending,
Priority: 0,
}
_ = repo.Enqueue(ctx, low)
// Small delay to ensure different timestamps
time.Sleep(10 * time.Millisecond)
high := &domain.QueuedCommand{
ProjectID: projectID,
Command: "high priority",
CommandType: domain.CommandTypeClaude,
Status: domain.QueueStatusPending,
Priority: 10,
}
_ = repo.Enqueue(ctx, high)
// Dequeue should get high priority first
dequeued, err := repo.Dequeue(ctx, projectID)
if err != nil {
t.Fatalf("Dequeue() error = %v", err)
}
if dequeued.Command != "high priority" {
t.Errorf("Command = %q, want %q", dequeued.Command, "high priority")
}
})
}
func TestCommandQueueRepository_UpdateStatus(t *testing.T) {
db := testutil.TestDB(t)
t.Cleanup(func() { cleanupTestQueue(t, db) })
repo := NewCommandQueueRepository(db)
ctx := context.Background()
t.Run("updates status without result", func(t *testing.T) {
cmd := &domain.QueuedCommand{
ProjectID: "test-proj-status-1",
Command: "test",
CommandType: domain.CommandTypeClaude,
Status: domain.QueueStatusPending,
}
repo.Enqueue(ctx, cmd)
err := repo.UpdateStatus(ctx, cmd.ID, domain.QueueStatusRunning, nil)
if err != nil {
t.Fatalf("UpdateStatus() error = %v", err)
}
retrieved, _ := repo.GetByID(ctx, cmd.ID)
if retrieved.Status != domain.QueueStatusRunning {
t.Errorf("Status = %q, want %q", retrieved.Status, domain.QueueStatusRunning)
}
})
t.Run("updates status with result", func(t *testing.T) {
cmd := &domain.QueuedCommand{
ProjectID: "test-proj-status-2",
Command: "test",
CommandType: domain.CommandTypeShell,
Status: domain.QueueStatusRunning,
}
repo.Enqueue(ctx, cmd)
result := &domain.QueuedCommandResult{
ExitCode: 0,
Output: "success output",
Error: "",
}
err := repo.UpdateStatus(ctx, cmd.ID, domain.QueueStatusCompleted, result)
if err != nil {
t.Fatalf("UpdateStatus() error = %v", err)
}
retrieved, _ := repo.GetByID(ctx, cmd.ID)
if retrieved.Status != domain.QueueStatusCompleted {
t.Errorf("Status = %q, want %q", retrieved.Status, domain.QueueStatusCompleted)
}
if retrieved.ExitCode == nil || *retrieved.ExitCode != 0 {
t.Errorf("ExitCode = %v, want 0", retrieved.ExitCode)
}
if retrieved.Output != "success output" {
t.Errorf("Output = %q, want %q", retrieved.Output, "success output")
}
})
}
func TestCommandQueueRepository_GetByID(t *testing.T) {
db := testutil.TestDB(t)
t.Cleanup(func() { cleanupTestQueue(t, db) })
repo := NewCommandQueueRepository(db)
ctx := context.Background()
t.Run("gets existing command", func(t *testing.T) {
cmd := &domain.QueuedCommand{
ProjectID: "test-proj-getbyid",
Command: "get by id test",
CommandType: domain.CommandTypeClaude,
WorkingDir: "/test/dir",
Status: domain.QueueStatusPending,
Priority: 5,
APIKeyID: "key-getbyid",
}
repo.Enqueue(ctx, cmd)
retrieved, err := repo.GetByID(ctx, cmd.ID)
if err != nil {
t.Fatalf("GetByID() error = %v", err)
}
if retrieved.Command != "get by id test" {
t.Errorf("Command = %q, want %q", retrieved.Command, "get by id test")
}
if retrieved.Priority != 5 {
t.Errorf("Priority = %d, want 5", retrieved.Priority)
}
if retrieved.APIKeyID != "key-getbyid" {
t.Errorf("APIKeyID = %q, want %q", retrieved.APIKeyID, "key-getbyid")
}
})
t.Run("returns error for non-existent command", func(t *testing.T) {
_, err := repo.GetByID(ctx, "00000000-0000-0000-0000-000000000000")
if err != domain.ErrCommandNotFound {
t.Errorf("GetByID() error = %v, want %v", err, domain.ErrCommandNotFound)
}
})
}
func TestCommandQueueRepository_List(t *testing.T) {
db := testutil.TestDB(t)
t.Cleanup(func() { cleanupTestQueue(t, db) })
repo := NewCommandQueueRepository(db)
ctx := context.Background()
projectID := "test-proj-list"
// Create test commands
for i := 0; i < 5; i++ {
status := domain.QueueStatusPending
if i%2 == 0 {
status = domain.QueueStatusCompleted
}
cmd := &domain.QueuedCommand{
ProjectID: projectID,
Command: "list test " + string(rune('a'+i)),
CommandType: domain.CommandTypeClaude,
Status: status,
}
repo.Enqueue(ctx, cmd)
time.Sleep(10 * time.Millisecond) // Ensure different timestamps
}
t.Run("lists all commands for project", func(t *testing.T) {
commands, err := repo.List(ctx, projectID, nil)
if err != nil {
t.Fatalf("List() error = %v", err)
}
if len(commands) < 5 {
t.Errorf("List() returned %d commands, want at least 5", len(commands))
}
})
t.Run("filters by status", func(t *testing.T) {
status := domain.QueueStatusPending
commands, err := repo.List(ctx, projectID, &domain.QueueFilters{
Status: &status,
Limit: 100,
})
if err != nil {
t.Fatalf("List() error = %v", err)
}
for _, cmd := range commands {
if cmd.Status != domain.QueueStatusPending {
t.Errorf("Command has Status = %q, want %q", cmd.Status, domain.QueueStatusPending)
}
}
})
t.Run("respects limit and offset", func(t *testing.T) {
commands, err := repo.List(ctx, projectID, &domain.QueueFilters{
Limit: 2,
Offset: 0,
})
if err != nil {
t.Fatalf("List() error = %v", err)
}
if len(commands) != 2 {
t.Errorf("List() returned %d commands, want 2", len(commands))
}
})
t.Run("respects sort order", func(t *testing.T) {
commands, err := repo.List(ctx, projectID, &domain.QueueFilters{
SortOrder: "asc",
Limit: 100,
})
if err != nil {
t.Fatalf("List() error = %v", err)
}
if len(commands) >= 2 {
if commands[0].CreatedAt.After(commands[1].CreatedAt) {
t.Error("List() with asc sort order should return oldest first")
}
}
})
}
func TestCommandQueueRepository_Cancel(t *testing.T) {
db := testutil.TestDB(t)
t.Cleanup(func() { cleanupTestQueue(t, db) })
repo := NewCommandQueueRepository(db)
ctx := context.Background()
t.Run("cancels pending command", func(t *testing.T) {
cmd := &domain.QueuedCommand{
ProjectID: "test-proj-cancel",
Command: "cancel test",
CommandType: domain.CommandTypeClaude,
Status: domain.QueueStatusPending,
}
repo.Enqueue(ctx, cmd)
err := repo.Cancel(ctx, cmd.ID)
if err != nil {
t.Fatalf("Cancel() error = %v", err)
}
retrieved, _ := repo.GetByID(ctx, cmd.ID)
if retrieved.Status != domain.QueueStatusCancelled {
t.Errorf("Status = %q, want %q", retrieved.Status, domain.QueueStatusCancelled)
}
if retrieved.CompletedAt == nil {
t.Error("CompletedAt should be set after cancel")
}
})
t.Run("returns error for non-pending command", func(t *testing.T) {
cmd := &domain.QueuedCommand{
ProjectID: "test-proj-cancel-running",
Command: "cancel running test",
CommandType: domain.CommandTypeClaude,
Status: domain.QueueStatusPending,
}
_ = repo.Enqueue(ctx, cmd)
// Make it running
_ = repo.UpdateStatus(ctx, cmd.ID, domain.QueueStatusRunning, nil)
err := repo.Cancel(ctx, cmd.ID)
if err == nil {
t.Error("Cancel() should return error for running command")
}
})
t.Run("returns error for non-existent command", func(t *testing.T) {
err := repo.Cancel(ctx, "00000000-0000-0000-0000-000000000000")
if err != domain.ErrCommandNotFound {
t.Errorf("Cancel() error = %v, want %v", err, domain.ErrCommandNotFound)
}
})
}
func TestCommandQueueRepository_GetStats(t *testing.T) {
db := testutil.TestDB(t)
t.Cleanup(func() { cleanupTestQueue(t, db) })
repo := NewCommandQueueRepository(db)
ctx := context.Background()
projectID := "test-proj-stats"
// Create commands with different statuses
statuses := []domain.QueueStatus{
domain.QueueStatusPending,
domain.QueueStatusPending,
domain.QueueStatusRunning,
domain.QueueStatusCompleted,
domain.QueueStatusFailed,
}
for i, status := range statuses {
cmd := &domain.QueuedCommand{
ProjectID: projectID,
Command: "stats test " + string(rune('a'+i)),
CommandType: domain.CommandTypeClaude,
Status: status,
}
repo.Enqueue(ctx, cmd)
}
t.Run("returns correct stats", func(t *testing.T) {
stats, err := repo.GetStats(ctx, projectID)
if err != nil {
t.Fatalf("GetStats() error = %v", err)
}
if stats.TotalPending < 2 {
t.Errorf("TotalPending = %d, want at least 2", stats.TotalPending)
}
// Note: Running status is set during enqueue but some may be dequeued
if stats.TotalCompleted < 1 {
t.Errorf("TotalCompleted = %d, want at least 1", stats.TotalCompleted)
}
if stats.TotalFailed < 1 {
t.Errorf("TotalFailed = %d, want at least 1", stats.TotalFailed)
}
})
}
func TestCommandQueueRepository_CleanupOld(t *testing.T) {
db := testutil.TestDB(t)
t.Cleanup(func() { cleanupTestQueue(t, db) })
repo := NewCommandQueueRepository(db)
ctx := context.Background()
// Create completed commands (CleanupOld only removes terminal states)
for i := 0; i < 3; i++ {
cmd := &domain.QueuedCommand{
ProjectID: "test-proj-cleanup",
Command: "cleanup test " + string(rune('a'+i)),
CommandType: domain.CommandTypeClaude,
Status: domain.QueueStatusPending,
}
_ = repo.Enqueue(ctx, cmd)
// Complete them
result := &domain.QueuedCommandResult{ExitCode: 0}
_ = repo.UpdateStatus(ctx, cmd.ID, domain.QueueStatusCompleted, result)
}
t.Run("cleanup runs without error", func(t *testing.T) {
// This won't delete newly created entries (they're not old enough)
// but we verify the function executes without error
deleted, err := repo.CleanupOld(ctx, 30)
if err != nil {
t.Fatalf("CleanupOld() error = %v", err)
}
// Newly created commands shouldn't be deleted
if deleted != 0 {
t.Logf("CleanupOld() deleted %d commands (expected 0 for new commands)", deleted)
}
})
}