Major refactoring to hexagonal (ports & adapters) architecture: - Add service layer (apikey_service, project_service) for business logic - Add webhook system with dispatcher and delivery tracking - Add command queue with priority-based processing - Add rate limiting with sliding window algorithm - Add audit logging for command execution - Add OpenTelemetry integration (traces, metrics, spans) - Add circuit breaker for fault tolerance - Add cached repository wrapper for performance - Add comprehensive validation package - Add Kubernetes client integration for pod management - Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks) - Add network policy and PodDisruptionBudget for k8s - Remove legacy executor and projects/registry packages - Untrack secrets.yaml (now managed via envault) - Add coverage.out to .gitignore - Add e2e test infrastructure with docker-compose - Add comprehensive documentation (API, architecture, operations, plans) - Add golangci-lint config and pre-commit hook Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
488 lines
13 KiB
Go
488 lines
13 KiB
Go
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/testutil"
|
|
)
|
|
|
|
func cleanupTestQueue(t *testing.T, db *sql.DB) {
|
|
t.Helper()
|
|
_, err := db.Exec("DELETE FROM command_queue WHERE project_id LIKE 'test-%'")
|
|
if err != nil {
|
|
t.Logf("cleanup test queue: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestCommandQueueRepository_Enqueue(t *testing.T) {
|
|
db := testutil.TestDB(t)
|
|
t.Cleanup(func() { cleanupTestQueue(t, db) })
|
|
|
|
repo := NewCommandQueueRepository(db)
|
|
ctx := context.Background()
|
|
|
|
t.Run("enqueues command successfully", func(t *testing.T) {
|
|
cmd := &domain.QueuedCommand{
|
|
ProjectID: "test-proj-enqueue-1",
|
|
Command: "explain this code",
|
|
CommandType: domain.CommandTypeClaude,
|
|
Status: domain.QueueStatusPending,
|
|
Priority: 0,
|
|
APIKeyID: "key-1",
|
|
}
|
|
|
|
err := repo.Enqueue(ctx, cmd)
|
|
if err != nil {
|
|
t.Fatalf("Enqueue() error = %v", err)
|
|
}
|
|
|
|
if cmd.ID == "" {
|
|
t.Error("ID should be set after enqueue")
|
|
}
|
|
if cmd.CreatedAt.IsZero() {
|
|
t.Error("CreatedAt should be set after enqueue")
|
|
}
|
|
})
|
|
|
|
t.Run("enqueues with working directory", func(t *testing.T) {
|
|
cmd := &domain.QueuedCommand{
|
|
ProjectID: "test-proj-enqueue-2",
|
|
Command: "ls -la",
|
|
CommandType: domain.CommandTypeShell,
|
|
WorkingDir: "/tmp",
|
|
Status: domain.QueueStatusPending,
|
|
Priority: 1,
|
|
}
|
|
|
|
err := repo.Enqueue(ctx, cmd)
|
|
if err != nil {
|
|
t.Fatalf("Enqueue() error = %v", err)
|
|
}
|
|
|
|
// Retrieve and verify
|
|
retrieved, err := repo.GetByID(ctx, cmd.ID)
|
|
if err != nil {
|
|
t.Fatalf("GetByID() error = %v", err)
|
|
}
|
|
|
|
if retrieved.WorkingDir != "/tmp" {
|
|
t.Errorf("WorkingDir = %q, want %q", retrieved.WorkingDir, "/tmp")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCommandQueueRepository_Dequeue(t *testing.T) {
|
|
db := testutil.TestDB(t)
|
|
t.Cleanup(func() { cleanupTestQueue(t, db) })
|
|
|
|
repo := NewCommandQueueRepository(db)
|
|
ctx := context.Background()
|
|
|
|
t.Run("dequeues pending command", func(t *testing.T) {
|
|
// Create a pending command
|
|
cmd := &domain.QueuedCommand{
|
|
ProjectID: "test-proj-dequeue-1",
|
|
Command: "test command",
|
|
CommandType: domain.CommandTypeClaude,
|
|
Status: domain.QueueStatusPending,
|
|
Priority: 0,
|
|
}
|
|
_ = repo.Enqueue(ctx, cmd)
|
|
|
|
// Dequeue it
|
|
dequeued, err := repo.Dequeue(ctx, "test-proj-dequeue-1")
|
|
if err != nil {
|
|
t.Fatalf("Dequeue() error = %v", err)
|
|
}
|
|
|
|
if dequeued == nil {
|
|
t.Fatal("Dequeue() returned nil")
|
|
}
|
|
|
|
if dequeued.Status != domain.QueueStatusRunning {
|
|
t.Errorf("Status = %q, want %q", dequeued.Status, domain.QueueStatusRunning)
|
|
}
|
|
if dequeued.StartedAt == nil {
|
|
t.Error("StartedAt should be set after dequeue")
|
|
}
|
|
})
|
|
|
|
t.Run("returns nil when no pending commands", func(t *testing.T) {
|
|
dequeued, err := repo.Dequeue(ctx, "test-proj-dequeue-empty")
|
|
if err != nil {
|
|
t.Fatalf("Dequeue() error = %v", err)
|
|
}
|
|
|
|
if dequeued != nil {
|
|
t.Error("Dequeue() should return nil when no pending commands")
|
|
}
|
|
})
|
|
|
|
t.Run("dequeues highest priority first", func(t *testing.T) {
|
|
projectID := "test-proj-dequeue-priority"
|
|
|
|
// Create commands with different priorities
|
|
low := &domain.QueuedCommand{
|
|
ProjectID: projectID,
|
|
Command: "low priority",
|
|
CommandType: domain.CommandTypeClaude,
|
|
Status: domain.QueueStatusPending,
|
|
Priority: 0,
|
|
}
|
|
_ = repo.Enqueue(ctx, low)
|
|
|
|
// Small delay to ensure different timestamps
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
high := &domain.QueuedCommand{
|
|
ProjectID: projectID,
|
|
Command: "high priority",
|
|
CommandType: domain.CommandTypeClaude,
|
|
Status: domain.QueueStatusPending,
|
|
Priority: 10,
|
|
}
|
|
_ = repo.Enqueue(ctx, high)
|
|
|
|
// Dequeue should get high priority first
|
|
dequeued, err := repo.Dequeue(ctx, projectID)
|
|
if err != nil {
|
|
t.Fatalf("Dequeue() error = %v", err)
|
|
}
|
|
|
|
if dequeued.Command != "high priority" {
|
|
t.Errorf("Command = %q, want %q", dequeued.Command, "high priority")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCommandQueueRepository_UpdateStatus(t *testing.T) {
|
|
db := testutil.TestDB(t)
|
|
t.Cleanup(func() { cleanupTestQueue(t, db) })
|
|
|
|
repo := NewCommandQueueRepository(db)
|
|
ctx := context.Background()
|
|
|
|
t.Run("updates status without result", func(t *testing.T) {
|
|
cmd := &domain.QueuedCommand{
|
|
ProjectID: "test-proj-status-1",
|
|
Command: "test",
|
|
CommandType: domain.CommandTypeClaude,
|
|
Status: domain.QueueStatusPending,
|
|
}
|
|
repo.Enqueue(ctx, cmd)
|
|
|
|
err := repo.UpdateStatus(ctx, cmd.ID, domain.QueueStatusRunning, nil)
|
|
if err != nil {
|
|
t.Fatalf("UpdateStatus() error = %v", err)
|
|
}
|
|
|
|
retrieved, _ := repo.GetByID(ctx, cmd.ID)
|
|
if retrieved.Status != domain.QueueStatusRunning {
|
|
t.Errorf("Status = %q, want %q", retrieved.Status, domain.QueueStatusRunning)
|
|
}
|
|
})
|
|
|
|
t.Run("updates status with result", func(t *testing.T) {
|
|
cmd := &domain.QueuedCommand{
|
|
ProjectID: "test-proj-status-2",
|
|
Command: "test",
|
|
CommandType: domain.CommandTypeShell,
|
|
Status: domain.QueueStatusRunning,
|
|
}
|
|
repo.Enqueue(ctx, cmd)
|
|
|
|
result := &domain.QueuedCommandResult{
|
|
ExitCode: 0,
|
|
Output: "success output",
|
|
Error: "",
|
|
}
|
|
|
|
err := repo.UpdateStatus(ctx, cmd.ID, domain.QueueStatusCompleted, result)
|
|
if err != nil {
|
|
t.Fatalf("UpdateStatus() error = %v", err)
|
|
}
|
|
|
|
retrieved, _ := repo.GetByID(ctx, cmd.ID)
|
|
if retrieved.Status != domain.QueueStatusCompleted {
|
|
t.Errorf("Status = %q, want %q", retrieved.Status, domain.QueueStatusCompleted)
|
|
}
|
|
if retrieved.ExitCode == nil || *retrieved.ExitCode != 0 {
|
|
t.Errorf("ExitCode = %v, want 0", retrieved.ExitCode)
|
|
}
|
|
if retrieved.Output != "success output" {
|
|
t.Errorf("Output = %q, want %q", retrieved.Output, "success output")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCommandQueueRepository_GetByID(t *testing.T) {
|
|
db := testutil.TestDB(t)
|
|
t.Cleanup(func() { cleanupTestQueue(t, db) })
|
|
|
|
repo := NewCommandQueueRepository(db)
|
|
ctx := context.Background()
|
|
|
|
t.Run("gets existing command", func(t *testing.T) {
|
|
cmd := &domain.QueuedCommand{
|
|
ProjectID: "test-proj-getbyid",
|
|
Command: "get by id test",
|
|
CommandType: domain.CommandTypeClaude,
|
|
WorkingDir: "/test/dir",
|
|
Status: domain.QueueStatusPending,
|
|
Priority: 5,
|
|
APIKeyID: "key-getbyid",
|
|
}
|
|
repo.Enqueue(ctx, cmd)
|
|
|
|
retrieved, err := repo.GetByID(ctx, cmd.ID)
|
|
if err != nil {
|
|
t.Fatalf("GetByID() error = %v", err)
|
|
}
|
|
|
|
if retrieved.Command != "get by id test" {
|
|
t.Errorf("Command = %q, want %q", retrieved.Command, "get by id test")
|
|
}
|
|
if retrieved.Priority != 5 {
|
|
t.Errorf("Priority = %d, want 5", retrieved.Priority)
|
|
}
|
|
if retrieved.APIKeyID != "key-getbyid" {
|
|
t.Errorf("APIKeyID = %q, want %q", retrieved.APIKeyID, "key-getbyid")
|
|
}
|
|
})
|
|
|
|
t.Run("returns error for non-existent command", func(t *testing.T) {
|
|
_, err := repo.GetByID(ctx, "00000000-0000-0000-0000-000000000000")
|
|
if err != domain.ErrCommandNotFound {
|
|
t.Errorf("GetByID() error = %v, want %v", err, domain.ErrCommandNotFound)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCommandQueueRepository_List(t *testing.T) {
|
|
db := testutil.TestDB(t)
|
|
t.Cleanup(func() { cleanupTestQueue(t, db) })
|
|
|
|
repo := NewCommandQueueRepository(db)
|
|
ctx := context.Background()
|
|
|
|
projectID := "test-proj-list"
|
|
|
|
// Create test commands
|
|
for i := 0; i < 5; i++ {
|
|
status := domain.QueueStatusPending
|
|
if i%2 == 0 {
|
|
status = domain.QueueStatusCompleted
|
|
}
|
|
cmd := &domain.QueuedCommand{
|
|
ProjectID: projectID,
|
|
Command: "list test " + string(rune('a'+i)),
|
|
CommandType: domain.CommandTypeClaude,
|
|
Status: status,
|
|
}
|
|
repo.Enqueue(ctx, cmd)
|
|
time.Sleep(10 * time.Millisecond) // Ensure different timestamps
|
|
}
|
|
|
|
t.Run("lists all commands for project", func(t *testing.T) {
|
|
commands, err := repo.List(ctx, projectID, nil)
|
|
if err != nil {
|
|
t.Fatalf("List() error = %v", err)
|
|
}
|
|
|
|
if len(commands) < 5 {
|
|
t.Errorf("List() returned %d commands, want at least 5", len(commands))
|
|
}
|
|
})
|
|
|
|
t.Run("filters by status", func(t *testing.T) {
|
|
status := domain.QueueStatusPending
|
|
commands, err := repo.List(ctx, projectID, &domain.QueueFilters{
|
|
Status: &status,
|
|
Limit: 100,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("List() error = %v", err)
|
|
}
|
|
|
|
for _, cmd := range commands {
|
|
if cmd.Status != domain.QueueStatusPending {
|
|
t.Errorf("Command has Status = %q, want %q", cmd.Status, domain.QueueStatusPending)
|
|
}
|
|
}
|
|
})
|
|
|
|
t.Run("respects limit and offset", func(t *testing.T) {
|
|
commands, err := repo.List(ctx, projectID, &domain.QueueFilters{
|
|
Limit: 2,
|
|
Offset: 0,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("List() error = %v", err)
|
|
}
|
|
|
|
if len(commands) != 2 {
|
|
t.Errorf("List() returned %d commands, want 2", len(commands))
|
|
}
|
|
})
|
|
|
|
t.Run("respects sort order", func(t *testing.T) {
|
|
commands, err := repo.List(ctx, projectID, &domain.QueueFilters{
|
|
SortOrder: "asc",
|
|
Limit: 100,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("List() error = %v", err)
|
|
}
|
|
|
|
if len(commands) >= 2 {
|
|
if commands[0].CreatedAt.After(commands[1].CreatedAt) {
|
|
t.Error("List() with asc sort order should return oldest first")
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCommandQueueRepository_Cancel(t *testing.T) {
|
|
db := testutil.TestDB(t)
|
|
t.Cleanup(func() { cleanupTestQueue(t, db) })
|
|
|
|
repo := NewCommandQueueRepository(db)
|
|
ctx := context.Background()
|
|
|
|
t.Run("cancels pending command", func(t *testing.T) {
|
|
cmd := &domain.QueuedCommand{
|
|
ProjectID: "test-proj-cancel",
|
|
Command: "cancel test",
|
|
CommandType: domain.CommandTypeClaude,
|
|
Status: domain.QueueStatusPending,
|
|
}
|
|
repo.Enqueue(ctx, cmd)
|
|
|
|
err := repo.Cancel(ctx, cmd.ID)
|
|
if err != nil {
|
|
t.Fatalf("Cancel() error = %v", err)
|
|
}
|
|
|
|
retrieved, _ := repo.GetByID(ctx, cmd.ID)
|
|
if retrieved.Status != domain.QueueStatusCancelled {
|
|
t.Errorf("Status = %q, want %q", retrieved.Status, domain.QueueStatusCancelled)
|
|
}
|
|
if retrieved.CompletedAt == nil {
|
|
t.Error("CompletedAt should be set after cancel")
|
|
}
|
|
})
|
|
|
|
t.Run("returns error for non-pending command", func(t *testing.T) {
|
|
cmd := &domain.QueuedCommand{
|
|
ProjectID: "test-proj-cancel-running",
|
|
Command: "cancel running test",
|
|
CommandType: domain.CommandTypeClaude,
|
|
Status: domain.QueueStatusPending,
|
|
}
|
|
_ = repo.Enqueue(ctx, cmd)
|
|
|
|
// Make it running
|
|
_ = repo.UpdateStatus(ctx, cmd.ID, domain.QueueStatusRunning, nil)
|
|
|
|
err := repo.Cancel(ctx, cmd.ID)
|
|
if err == nil {
|
|
t.Error("Cancel() should return error for running command")
|
|
}
|
|
})
|
|
|
|
t.Run("returns error for non-existent command", func(t *testing.T) {
|
|
err := repo.Cancel(ctx, "00000000-0000-0000-0000-000000000000")
|
|
if err != domain.ErrCommandNotFound {
|
|
t.Errorf("Cancel() error = %v, want %v", err, domain.ErrCommandNotFound)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCommandQueueRepository_GetStats(t *testing.T) {
|
|
db := testutil.TestDB(t)
|
|
t.Cleanup(func() { cleanupTestQueue(t, db) })
|
|
|
|
repo := NewCommandQueueRepository(db)
|
|
ctx := context.Background()
|
|
|
|
projectID := "test-proj-stats"
|
|
|
|
// Create commands with different statuses
|
|
statuses := []domain.QueueStatus{
|
|
domain.QueueStatusPending,
|
|
domain.QueueStatusPending,
|
|
domain.QueueStatusRunning,
|
|
domain.QueueStatusCompleted,
|
|
domain.QueueStatusFailed,
|
|
}
|
|
|
|
for i, status := range statuses {
|
|
cmd := &domain.QueuedCommand{
|
|
ProjectID: projectID,
|
|
Command: "stats test " + string(rune('a'+i)),
|
|
CommandType: domain.CommandTypeClaude,
|
|
Status: status,
|
|
}
|
|
repo.Enqueue(ctx, cmd)
|
|
}
|
|
|
|
t.Run("returns correct stats", func(t *testing.T) {
|
|
stats, err := repo.GetStats(ctx, projectID)
|
|
if err != nil {
|
|
t.Fatalf("GetStats() error = %v", err)
|
|
}
|
|
|
|
if stats.TotalPending < 2 {
|
|
t.Errorf("TotalPending = %d, want at least 2", stats.TotalPending)
|
|
}
|
|
// Note: Running status is set during enqueue but some may be dequeued
|
|
if stats.TotalCompleted < 1 {
|
|
t.Errorf("TotalCompleted = %d, want at least 1", stats.TotalCompleted)
|
|
}
|
|
if stats.TotalFailed < 1 {
|
|
t.Errorf("TotalFailed = %d, want at least 1", stats.TotalFailed)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCommandQueueRepository_CleanupOld(t *testing.T) {
|
|
db := testutil.TestDB(t)
|
|
t.Cleanup(func() { cleanupTestQueue(t, db) })
|
|
|
|
repo := NewCommandQueueRepository(db)
|
|
ctx := context.Background()
|
|
|
|
// Create completed commands (CleanupOld only removes terminal states)
|
|
for i := 0; i < 3; i++ {
|
|
cmd := &domain.QueuedCommand{
|
|
ProjectID: "test-proj-cleanup",
|
|
Command: "cleanup test " + string(rune('a'+i)),
|
|
CommandType: domain.CommandTypeClaude,
|
|
Status: domain.QueueStatusPending,
|
|
}
|
|
_ = repo.Enqueue(ctx, cmd)
|
|
|
|
// Complete them
|
|
result := &domain.QueuedCommandResult{ExitCode: 0}
|
|
_ = repo.UpdateStatus(ctx, cmd.ID, domain.QueueStatusCompleted, result)
|
|
}
|
|
|
|
t.Run("cleanup runs without error", func(t *testing.T) {
|
|
// This won't delete newly created entries (they're not old enough)
|
|
// but we verify the function executes without error
|
|
deleted, err := repo.CleanupOld(ctx, 30)
|
|
if err != nil {
|
|
t.Fatalf("CleanupOld() error = %v", err)
|
|
}
|
|
|
|
// Newly created commands shouldn't be deleted
|
|
if deleted != 0 {
|
|
t.Logf("CleanupOld() deleted %d commands (expected 0 for new commands)", deleted)
|
|
}
|
|
})
|
|
}
|