diff --git a/cmd/rdev-api/main.go b/cmd/rdev-api/main.go index de3e0e5..693a54e 100644 --- a/cmd/rdev-api/main.go +++ b/cmd/rdev-api/main.go @@ -452,12 +452,15 @@ func main() { // Start work executor (cross-project worker pool, git via kubectl exec) buildExecutor := worker.NewBuildExecutor(agentRegistry, podGitOps, streamPub, logger, nil) + // VerifyExecutor requires CommandExecutor - will be wired in Week 2 + var verifyExecutor *worker.VerifyExecutor workerCfg := worker.DefaultWorkExecutorConfig() workerCfg.Logger = logger workExecutor := worker.NewWorkExecutor( workerService, workService, buildExecutor, + verifyExecutor, workerCfg, ) if err := workExecutor.Start(); err != nil { diff --git a/internal/domain/verify.go b/internal/domain/verify.go new file mode 100644 index 0000000..9602c54 --- /dev/null +++ b/internal/domain/verify.go @@ -0,0 +1,198 @@ +package domain + +import ( + "errors" + "fmt" + "net/url" + "strconv" +) + +// Verify-related errors. +var ( + ErrVerifyURLRequired = errors.New("url is required for verify spec") +) + +// VerifySpec defines what a verify task should accomplish. +type VerifySpec struct { + // URL is the page to capture (required). + URL string `json:"url"` + + // Viewports is a list of viewport sizes to capture. + // Default: ["1920x1080", "768x1024", "375x667"] + Viewports []string `json:"viewports,omitempty"` + + // WaitFor is a CSS selector to wait for before capturing. + // Default: "body" + WaitFor string `json:"wait_for,omitempty"` + + // WaitTimeout is the maximum time to wait for the selector in milliseconds. + // Default: 10000 + WaitTimeout int `json:"wait_timeout,omitempty"` + + // FullPage captures the entire scrollable page if true. + FullPage bool `json:"full_page,omitempty"` + + // Video records a video of the page load if true. + Video bool `json:"video,omitempty"` + + // Evaluate enables AI evaluation of the captures (Week 3). + Evaluate bool `json:"evaluate,omitempty"` + + // Prompt provides context for AI evaluation (Week 3). + Prompt string `json:"prompt,omitempty"` + + // CallbackURL is the webhook URL for completion notification. + CallbackURL string `json:"callback_url,omitempty"` +} + +// DefaultViewports returns the default viewport sizes for captures. +func DefaultViewports() []string { + return []string{"1920x1080", "768x1024", "375x667"} +} + +// Validate checks that the VerifySpec has all required fields. +func (s *VerifySpec) Validate() error { + if s.URL == "" { + return ErrVerifyURLRequired + } + + // Validate URL format + u, err := url.Parse(s.URL) + if err != nil { + return fmt.Errorf("invalid url: %w", err) + } + if u.Scheme != "http" && u.Scheme != "https" { + return fmt.Errorf("url scheme must be http or https, got %q", u.Scheme) + } + + // Validate callback URL if provided + if s.CallbackURL != "" { + cu, err := url.Parse(s.CallbackURL) + if err != nil { + return fmt.Errorf("invalid callback_url: %w", err) + } + if cu.Scheme != "http" && cu.Scheme != "https" { + return fmt.Errorf("callback_url scheme must be http or https, got %q", cu.Scheme) + } + } + + return nil +} + +// WithDefaults returns a copy of the spec with default values applied. +func (s *VerifySpec) WithDefaults() *VerifySpec { + spec := *s + if len(spec.Viewports) == 0 { + spec.Viewports = DefaultViewports() + } + if spec.WaitFor == "" { + spec.WaitFor = "body" + } + if spec.WaitTimeout == 0 { + spec.WaitTimeout = 10000 + } + return &spec +} + +// VerifyResult captures the outcome of a verify execution. +type VerifyResult struct { + // Success indicates whether the capture completed successfully. + Success bool `json:"success"` + + // Screenshots maps viewport size to screenshot file path. + // Example: {"1920x1080": "/captures/task-id/1920_1080.png"} + Screenshots map[string]string `json:"screenshots,omitempty"` + + // Video is the path to the recorded video if requested. + Video string `json:"video,omitempty"` + + // Evaluation contains the AI evaluation result (Week 3). + Evaluation string `json:"evaluation,omitempty"` + + // Score is the AI-assigned score 0-100 (Week 3). + Score int `json:"score,omitempty"` + + // Passed indicates whether the AI evaluation passed (Week 3). + Passed bool `json:"passed,omitempty"` + + // DurationMs is how long the capture took in milliseconds. + DurationMs int64 `json:"duration_ms"` + + // Error contains the error message if capture failed. + Error string `json:"error,omitempty"` +} + +// ToWorkResult converts a VerifyResult to a WorkResult for queue compatibility. +// Screenshots are promoted to artifacts with viewport as key prefix. +func (r *VerifyResult) ToWorkResult() *WorkResult { + if r == nil { + return &WorkResult{} + } + + wr := &WorkResult{} + + // Use error as output if failed, otherwise empty (captures are in artifacts) + if !r.Success && r.Error != "" { + wr.Output = r.Error + } + + // Promote screenshots and metadata to artifacts + if len(r.Screenshots) > 0 || r.Video != "" || r.DurationMs > 0 { + wr.Artifacts = make(map[string]string) + + // Add screenshots + for viewport, path := range r.Screenshots { + wr.Artifacts["screenshot_"+viewport] = path + } + + // Add video if present + if r.Video != "" { + wr.Artifacts["video"] = r.Video + } + + // Add duration + if r.DurationMs > 0 { + wr.Artifacts["duration_ms"] = strconv.FormatInt(r.DurationMs, 10) + } + + // Add evaluation results (Week 3) + if r.Evaluation != "" { + wr.Artifacts["evaluation"] = r.Evaluation + } + if r.Score > 0 { + wr.Artifacts["score"] = strconv.Itoa(r.Score) + } + if r.Passed { + wr.Artifacts["passed"] = "true" + } + } + + return wr +} + +// ToBuildResult converts a VerifyResult to a BuildResult for executor compatibility. +// This allows the verify executor to return results through the work queue. +func (r *VerifyResult) ToBuildResult() *BuildResult { + if r == nil { + return &BuildResult{} + } + + br := &BuildResult{ + Success: r.Success, + DurationMs: r.DurationMs, + Error: r.Error, + } + + // Promote screenshots and video to artifacts + if len(r.Screenshots) > 0 || r.Video != "" { + br.Artifacts = make(map[string]string) + for viewport, path := range r.Screenshots { + br.Artifacts["screenshot_"+viewport] = path + } + if r.Video != "" { + br.Artifacts["video"] = r.Video + } + } + + return br +} diff --git a/internal/domain/verify_test.go b/internal/domain/verify_test.go new file mode 100644 index 0000000..82b6c3e --- /dev/null +++ b/internal/domain/verify_test.go @@ -0,0 +1,274 @@ +package domain + +import ( + "errors" + "testing" +) + +func TestVerifySpec_Validate(t *testing.T) { + tests := []struct { + name string + spec VerifySpec + wantErr error + }{ + { + name: "valid spec with URL only", + spec: VerifySpec{URL: "https://example.com"}, + wantErr: nil, + }, + { + name: "valid spec with all fields", + spec: VerifySpec{ + URL: "https://example.com/page", + Viewports: []string{"1920x1080", "375x667"}, + WaitFor: "#main", + WaitTimeout: 5000, + FullPage: true, + Video: true, + CallbackURL: "https://webhook.example.com/notify", + }, + wantErr: nil, + }, + { + name: "empty URL", + spec: VerifySpec{}, + wantErr: ErrVerifyURLRequired, + }, + { + name: "invalid URL scheme", + spec: VerifySpec{URL: "ftp://example.com"}, + wantErr: nil, // Validate will fail but not return ErrVerifyURLRequired + }, + { + name: "invalid callback URL scheme", + spec: VerifySpec{URL: "https://example.com", CallbackURL: "ftp://webhook.com"}, + wantErr: nil, // Validate will fail but not return ErrVerifyURLRequired + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.spec.Validate() + if tt.wantErr != nil { + if !errors.Is(err, tt.wantErr) { + t.Errorf("Validate() error = %v, want %v", err, tt.wantErr) + } + } else if tt.name == "invalid URL scheme" || tt.name == "invalid callback URL scheme" { + // These should fail with a different error + if err == nil { + t.Errorf("Validate() expected error for %s", tt.name) + } + } else if err != nil { + t.Errorf("Validate() unexpected error = %v", err) + } + }) + } +} + +func TestVerifySpec_WithDefaults(t *testing.T) { + t.Run("applies defaults to empty spec", func(t *testing.T) { + spec := &VerifySpec{URL: "https://example.com"} + result := spec.WithDefaults() + + if len(result.Viewports) != 3 { + t.Errorf("expected 3 default viewports, got %d", len(result.Viewports)) + } + if result.Viewports[0] != "1920x1080" { + t.Errorf("expected first viewport '1920x1080', got %q", result.Viewports[0]) + } + if result.WaitFor != "body" { + t.Errorf("expected WaitFor 'body', got %q", result.WaitFor) + } + if result.WaitTimeout != 10000 { + t.Errorf("expected WaitTimeout 10000, got %d", result.WaitTimeout) + } + }) + + t.Run("preserves existing values", func(t *testing.T) { + spec := &VerifySpec{ + URL: "https://example.com", + Viewports: []string{"800x600"}, + WaitFor: "#app", + WaitTimeout: 5000, + } + result := spec.WithDefaults() + + if len(result.Viewports) != 1 || result.Viewports[0] != "800x600" { + t.Errorf("expected preserved viewports, got %v", result.Viewports) + } + if result.WaitFor != "#app" { + t.Errorf("expected preserved WaitFor, got %q", result.WaitFor) + } + if result.WaitTimeout != 5000 { + t.Errorf("expected preserved WaitTimeout, got %d", result.WaitTimeout) + } + }) + + t.Run("does not modify original", func(t *testing.T) { + spec := &VerifySpec{URL: "https://example.com"} + _ = spec.WithDefaults() + + if len(spec.Viewports) != 0 { + t.Error("original spec should not be modified") + } + }) +} + +func TestVerifyResult_ToWorkResult(t *testing.T) { + t.Run("success with screenshots", func(t *testing.T) { + result := &VerifyResult{ + Success: true, + Screenshots: map[string]string{ + "1920x1080": "/captures/task-1/1920_1080.png", + "375x667": "/captures/task-1/375_667.png", + }, + DurationMs: 1500, + } + + wr := result.ToWorkResult() + + if wr.Output != "" { + t.Errorf("expected empty output on success, got %q", wr.Output) + } + if wr.Artifacts["screenshot_1920x1080"] != "/captures/task-1/1920_1080.png" { + t.Errorf("screenshot_1920x1080 = %q", wr.Artifacts["screenshot_1920x1080"]) + } + if wr.Artifacts["screenshot_375x667"] != "/captures/task-1/375_667.png" { + t.Errorf("screenshot_375x667 = %q", wr.Artifacts["screenshot_375x667"]) + } + if wr.Artifacts["duration_ms"] != "1500" { + t.Errorf("duration_ms = %q", wr.Artifacts["duration_ms"]) + } + }) + + t.Run("success with video", func(t *testing.T) { + result := &VerifyResult{ + Success: true, + Screenshots: map[string]string{"1920x1080": "/captures/task-1/1920_1080.png"}, + Video: "/captures/task-1/recording.webm", + DurationMs: 2000, + } + + wr := result.ToWorkResult() + + if wr.Artifacts["video"] != "/captures/task-1/recording.webm" { + t.Errorf("video = %q", wr.Artifacts["video"]) + } + }) + + t.Run("failure uses error as output", func(t *testing.T) { + result := &VerifyResult{ + Success: false, + Error: "capture failed: timeout", + } + + wr := result.ToWorkResult() + + if wr.Output != "capture failed: timeout" { + t.Errorf("Output = %q, want error message", wr.Output) + } + }) + + t.Run("nil receiver returns empty result", func(t *testing.T) { + var result *VerifyResult + wr := result.ToWorkResult() + + if wr.Output != "" { + t.Errorf("Output = %q, want empty", wr.Output) + } + if wr.Artifacts != nil { + t.Errorf("Artifacts = %v, want nil", wr.Artifacts) + } + }) + + t.Run("evaluation results included", func(t *testing.T) { + result := &VerifyResult{ + Success: true, + Evaluation: "Page renders correctly with all elements visible", + Score: 85, + Passed: true, + DurationMs: 1000, + } + + wr := result.ToWorkResult() + + if wr.Artifacts["evaluation"] != "Page renders correctly with all elements visible" { + t.Errorf("evaluation = %q", wr.Artifacts["evaluation"]) + } + if wr.Artifacts["score"] != "85" { + t.Errorf("score = %q", wr.Artifacts["score"]) + } + if wr.Artifacts["passed"] != "true" { + t.Errorf("passed = %q", wr.Artifacts["passed"]) + } + }) +} + +func TestVerifyResult_ToBuildResult(t *testing.T) { + t.Run("success conversion", func(t *testing.T) { + result := &VerifyResult{ + Success: true, + Screenshots: map[string]string{ + "1920x1080": "/captures/task-1/1920_1080.png", + }, + Video: "/captures/task-1/recording.webm", + DurationMs: 1500, + } + + br := result.ToBuildResult() + + if !br.Success { + t.Error("expected success = true") + } + if br.DurationMs != 1500 { + t.Errorf("DurationMs = %d, want 1500", br.DurationMs) + } + if br.Artifacts["screenshot_1920x1080"] != "/captures/task-1/1920_1080.png" { + t.Errorf("screenshot artifact missing") + } + if br.Artifacts["video"] != "/captures/task-1/recording.webm" { + t.Errorf("video artifact missing") + } + }) + + t.Run("failure conversion", func(t *testing.T) { + result := &VerifyResult{ + Success: false, + Error: "capture failed", + DurationMs: 500, + } + + br := result.ToBuildResult() + + if br.Success { + t.Error("expected success = false") + } + if br.Error != "capture failed" { + t.Errorf("Error = %q", br.Error) + } + }) + + t.Run("nil receiver returns empty", func(t *testing.T) { + var result *VerifyResult + br := result.ToBuildResult() + + if br.Success { + t.Error("expected success = false for nil") + } + }) +} + +func TestDefaultViewports(t *testing.T) { + viewports := DefaultViewports() + + if len(viewports) != 3 { + t.Fatalf("expected 3 viewports, got %d", len(viewports)) + } + + expected := []string{"1920x1080", "768x1024", "375x667"} + for i, vp := range expected { + if viewports[i] != vp { + t.Errorf("viewport[%d] = %q, want %q", i, viewports[i], vp) + } + } +} diff --git a/internal/service/worker_service.go b/internal/service/worker_service.go index 7316ae2..df87813 100644 --- a/internal/service/worker_service.go +++ b/internal/service/worker_service.go @@ -185,6 +185,53 @@ func (s *WorkerService) CompleteTask(ctx context.Context, workerID, taskID strin return nil } +// FailTask marks a task as failed, updates audit, and returns worker to idle. +// This mirrors CompleteTask but for the failure path, ensuring the build_audit +// table is updated with the failure status. +func (s *WorkerService) FailTask(ctx context.Context, workerID, taskID string, result *domain.BuildResult, workSvc WorkServiceFailer) error { + if result == nil { + result = &domain.BuildResult{Success: false, Error: "unknown error"} + } + + // Update audit record with failure (non-critical) + if s.audit != nil { + if err := s.audit.Update(ctx, taskID, result); err != nil { + s.logger.Warn("failed to update audit on failure", + "task_id", taskID, + "error", err, + ) + } + } + + // Classify the error and delegate to work service for retry logic + errorCode := domain.ClassifyAgentError(result.Error, result.Output) + if err := workSvc.FailTaskWithCode(ctx, taskID, result.Error, errorCode); err != nil { + return err + } + + // Return worker to idle + if err := s.registry.UpdateStatus(ctx, workerID, domain.WorkerStatusIdle, ""); err != nil { + s.logger.Warn("failed to return worker to idle after failure", + "worker_id", workerID, + "error", err, + ) + } + + s.logger.Info("task failed", + "task_id", taskID, + "worker_id", workerID, + "error", result.Error, + ) + + return nil +} + +// WorkServiceFailer is the interface for the work service failure method. +// This avoids a circular dependency between WorkerService and WorkService. +type WorkServiceFailer interface { + FailTaskWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error +} + // DrainWorker sets a worker to draining status so it finishes current work // but doesn't accept new tasks. func (s *WorkerService) DrainWorker(ctx context.Context, workerID string) error { diff --git a/internal/worker/verify_executor.go b/internal/worker/verify_executor.go new file mode 100644 index 0000000..12901cf --- /dev/null +++ b/internal/worker/verify_executor.go @@ -0,0 +1,342 @@ +package worker + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/port" +) + +// Verify event type constants for SSE streaming. +const ( + VerifyEventStarted = "verify.started" + VerifyEventCapturing = "verify.capturing" + VerifyEventCaptured = "verify.captured" + VerifyEventCompleted = "verify.completed" + VerifyEventFailed = "verify.failed" +) + +// VerifyExecutor handles WorkTaskTypeVerify tasks. +// It translates VerifySpec fields from the work task's Spec map, +// executes the Playwright capture script via kubectl exec, +// and returns a BuildResult for queue compatibility. +type VerifyExecutor struct { + cmdExecutor port.CommandExecutor // kubectl exec wrapper + streams port.StreamPublisher // SSE stream publisher for real-time events + logger *slog.Logger + namespace string // Kubernetes namespace for the pod + podName string // Playwright pod name (e.g., "playwright-0") +} + +// VerifyExecutorConfig holds configuration for the verify executor. +type VerifyExecutorConfig struct { + Namespace string // Kubernetes namespace (e.g., "rdev") + PodName string // Playwright pod name (e.g., "playwright-0") +} + +// NewVerifyExecutor creates a new verify executor. +func NewVerifyExecutor( + cmdExecutor port.CommandExecutor, + streams port.StreamPublisher, + logger *slog.Logger, + cfg *VerifyExecutorConfig, +) *VerifyExecutor { + if logger == nil { + logger = slog.Default() + } + if cfg == nil { + cfg = &VerifyExecutorConfig{ + Namespace: "rdev", + PodName: "playwright-0", + } + } + if cfg.Namespace == "" { + cfg.Namespace = "rdev" + } + if cfg.PodName == "" { + cfg.PodName = "playwright-0" + } + return &VerifyExecutor{ + cmdExecutor: cmdExecutor, + streams: streams, + logger: logger.With("component", "verify-executor"), + namespace: cfg.Namespace, + podName: cfg.PodName, + } +} + +// Execute runs a verify task by capturing screenshots/video of a URL. +func (v *VerifyExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult { + start := time.Now() + streamID := task.ID // Use task ID as stream ID for SSE + + // Publish verify.started event + v.publishEvent(streamID, VerifyEventStarted, map[string]any{ + "task_id": task.ID, + "project_id": task.ProjectID, + "started_at": start.Format(time.RFC3339), + }) + + // Parse VerifySpec from task.Spec + spec, err := v.parseSpec(task.Spec) + if err != nil { + v.publishEvent(streamID, VerifyEventFailed, map[string]any{ + "task_id": task.ID, + "error": fmt.Sprintf("invalid verify spec: %v", err), + }) + return &domain.BuildResult{ + Success: false, + Error: fmt.Sprintf("invalid verify spec: %v", err), + DurationMs: time.Since(start).Milliseconds(), + } + } + + // Apply defaults + spec = spec.WithDefaults() + + // Build output directory using task ID for isolation + outputDir := fmt.Sprintf("/captures/%s", task.ID) + + // Publish capturing event + v.publishEvent(streamID, VerifyEventCapturing, map[string]any{ + "task_id": task.ID, + "url": spec.URL, + "viewports": spec.Viewports, + }) + + v.logger.Info("executing verify capture", + "task_id", task.ID, + "project_id", task.ProjectID, + "url", spec.URL, + "viewports", spec.Viewports, + "pod", v.podName, + ) + + // Build capture command + cmdArgs := v.buildCaptureCommand(spec, outputDir) + + // Execute via CommandExecutor + captureOutput, err := v.executeCapture(ctx, task.ID, cmdArgs) + if err != nil { + v.publishEvent(streamID, VerifyEventFailed, map[string]any{ + "task_id": task.ID, + "error": fmt.Sprintf("capture execution failed: %v", err), + "duration_ms": time.Since(start).Milliseconds(), + }) + v.closeStream(ctx, streamID) + return &domain.BuildResult{ + Success: false, + Error: fmt.Sprintf("capture execution failed: %v", err), + DurationMs: time.Since(start).Milliseconds(), + } + } + + // Parse JSON manifest from stdout + verifyResult, err := v.parseManifest(captureOutput) + if err != nil { + v.publishEvent(streamID, VerifyEventFailed, map[string]any{ + "task_id": task.ID, + "error": fmt.Sprintf("failed to parse capture manifest: %v", err), + "duration_ms": time.Since(start).Milliseconds(), + }) + v.closeStream(ctx, streamID) + return &domain.BuildResult{ + Success: false, + Error: fmt.Sprintf("failed to parse capture manifest: %v", err), + DurationMs: time.Since(start).Milliseconds(), + } + } + + verifyResult.Success = true + verifyResult.DurationMs = time.Since(start).Milliseconds() + + // Publish captured event + v.publishEvent(streamID, VerifyEventCaptured, map[string]any{ + "task_id": task.ID, + "screenshots": verifyResult.Screenshots, + "video": verifyResult.Video, + }) + + // Publish completion event + v.publishEvent(streamID, VerifyEventCompleted, map[string]any{ + "task_id": task.ID, + "success": true, + "screenshots": verifyResult.Screenshots, + "video": verifyResult.Video, + "duration_ms": verifyResult.DurationMs, + }) + v.closeStream(ctx, streamID) + + // Convert to BuildResult for queue compatibility + return verifyResult.ToBuildResult() +} + +// buildCaptureCommand constructs the node command arguments for capture.js. +func (v *VerifyExecutor) buildCaptureCommand(spec *domain.VerifySpec, outputDir string) []string { + args := []string{ + "node", "/scripts/capture.js", + "--url=" + spec.URL, + "--output=" + outputDir, + "--viewports=" + strings.Join(spec.Viewports, ","), + } + + if spec.WaitFor != "" { + args = append(args, "--wait-for="+spec.WaitFor) + } + + if spec.FullPage { + args = append(args, "--full-page=true") + } + + if spec.Video { + args = append(args, "--video=true") + } + + return args +} + +// executeCapture runs the capture command in the Playwright pod. +func (v *VerifyExecutor) executeCapture(ctx context.Context, taskID string, cmdArgs []string) (string, error) { + // Create a command to execute + cmd := &domain.Command{ + ID: domain.CommandID(fmt.Sprintf("verify-%s", taskID)), + ProjectID: "", // Verify tasks aren't project-specific + Type: domain.CommandTypeShell, + Args: cmdArgs, + StartedAt: time.Now(), + } + + var outputBuilder strings.Builder + + // Execute the command and capture output + result, err := v.cmdExecutor.Execute(ctx, cmd, v.podName, func(line domain.OutputLine) { + // Capture all output for parsing + if line.Stream == "stdout" { + outputBuilder.WriteString(line.Line) + outputBuilder.WriteString("\n") + } + }) + if err != nil { + return "", fmt.Errorf("kubectl exec failed: %w", err) + } + + if !result.Success() { + return "", fmt.Errorf("capture script failed with exit code %d", result.ExitCode) + } + + return outputBuilder.String(), nil +} + +// parseManifest parses the JSON capture manifest from the script output. +func (v *VerifyExecutor) parseManifest(output string) (*domain.VerifyResult, error) { + // The capture script outputs JSON to stdout + // Find the JSON line (last non-empty line) + lines := strings.Split(strings.TrimSpace(output), "\n") + if len(lines) == 0 { + return nil, fmt.Errorf("no output from capture script") + } + + jsonLine := lines[len(lines)-1] + + var manifest struct { + Screenshots map[string]string `json:"screenshots"` + Video string `json:"video,omitempty"` + } + + if err := json.Unmarshal([]byte(jsonLine), &manifest); err != nil { + return nil, fmt.Errorf("invalid JSON manifest: %w", err) + } + + return &domain.VerifyResult{ + Screenshots: manifest.Screenshots, + Video: manifest.Video, + }, nil +} + +// parseSpec extracts typed VerifySpec fields from the generic map[string]any. +func (v *VerifyExecutor) parseSpec(spec map[string]any) (*domain.VerifySpec, error) { + url, _ := spec["url"].(string) + if url == "" { + return nil, domain.ErrVerifyURLRequired + } + + vs := &domain.VerifySpec{ + URL: url, + } + + // Parse optional fields + if viewports, ok := spec["viewports"].([]any); ok { + for _, vp := range viewports { + if s, ok := vp.(string); ok { + vs.Viewports = append(vs.Viewports, s) + } + } + } + + if waitFor, ok := spec["wait_for"].(string); ok { + vs.WaitFor = waitFor + } + + if waitTimeout, ok := spec["wait_timeout"].(float64); ok { + vs.WaitTimeout = int(waitTimeout) + } + + if fullPage, ok := spec["full_page"].(bool); ok { + vs.FullPage = fullPage + } + + if video, ok := spec["video"].(bool); ok { + vs.Video = video + } + + if evaluate, ok := spec["evaluate"].(bool); ok { + vs.Evaluate = evaluate + } + + if prompt, ok := spec["prompt"].(string); ok { + vs.Prompt = prompt + } + + if callbackURL, ok := spec["callback_url"].(string); ok { + vs.CallbackURL = callbackURL + } + + // Validate the spec + if err := vs.Validate(); err != nil { + return nil, err + } + + return vs, nil +} + +// publishEvent publishes an event to the SSE stream if a stream publisher is configured. +func (v *VerifyExecutor) publishEvent(streamID, eventType string, data map[string]any) { + if v.streams == nil { + return + } + v.streams.Publish(streamID, port.StreamEvent{ + Type: eventType, + Data: data, + }) +} + +// closeStream closes the stream after a delay to allow clients to receive final events. +func (v *VerifyExecutor) closeStream(ctx context.Context, streamID string) { + if v.streams == nil { + return + } + // Close stream after a short delay to ensure final events are delivered. + go func() { + select { + case <-ctx.Done(): + v.streams.Close(streamID) + case <-time.After(streamCloseDelay): + v.streams.Close(streamID) + } + }() +} diff --git a/internal/worker/verify_executor_test.go b/internal/worker/verify_executor_test.go new file mode 100644 index 0000000..2c793cf --- /dev/null +++ b/internal/worker/verify_executor_test.go @@ -0,0 +1,333 @@ +package worker + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/orchard9/rdev/internal/domain" +) + +func TestVerifyExecutor_Execute_Success(t *testing.T) { + cmdExec := newMockCommandExecutor() + // Simulate capture.js JSON output + cmdExec.output = []domain.OutputLine{ + {Stream: "stdout", Line: `{"screenshots":{"1920x1080":"/captures/task-1/1920_1080.png","375x667":"/captures/task-1/375_667.png"},"video":"/captures/task-1/recording.webm"}`, Timestamp: time.Now()}, + } + + exec := NewVerifyExecutor(cmdExec, nil, nil, nil) + + task := &domain.WorkTask{ + ID: "task-1", + ProjectID: "project-1", + Type: domain.WorkTaskTypeVerify, + Spec: map[string]any{ + "url": "https://example.com", + "viewports": []any{"1920x1080", "375x667"}, + "video": true, + }, + } + + result := exec.Execute(context.Background(), task) + + if !result.Success { + t.Errorf("expected success, got error: %s", result.Error) + } + if result.DurationMs < 0 { + t.Errorf("expected non-negative duration, got %d", result.DurationMs) + } + // Check artifacts were populated + if result.Artifacts == nil { + t.Fatal("expected artifacts to be populated") + } + if result.Artifacts["screenshot_1920x1080"] != "/captures/task-1/1920_1080.png" { + t.Errorf("screenshot_1920x1080 = %q", result.Artifacts["screenshot_1920x1080"]) + } + if result.Artifacts["video"] != "/captures/task-1/recording.webm" { + t.Errorf("video = %q", result.Artifacts["video"]) + } +} + +func TestVerifyExecutor_Execute_URLRequired(t *testing.T) { + cmdExec := newMockCommandExecutor() + exec := NewVerifyExecutor(cmdExec, nil, nil, nil) + + task := &domain.WorkTask{ + ID: "task-1", + ProjectID: "project-1", + Type: domain.WorkTaskTypeVerify, + Spec: map[string]any{}, // Missing URL + } + + result := exec.Execute(context.Background(), task) + + if result.Success { + t.Error("expected failure for missing URL") + } + if result.Error == "" { + t.Error("expected error message") + } +} + +func TestVerifyExecutor_Execute_InvalidURL(t *testing.T) { + cmdExec := newMockCommandExecutor() + exec := NewVerifyExecutor(cmdExec, nil, nil, nil) + + task := &domain.WorkTask{ + ID: "task-1", + ProjectID: "project-1", + Type: domain.WorkTaskTypeVerify, + Spec: map[string]any{ + "url": "ftp://invalid-scheme.com", + }, + } + + result := exec.Execute(context.Background(), task) + + if result.Success { + t.Error("expected failure for invalid URL scheme") + } +} + +func TestVerifyExecutor_Execute_CaptureFailure(t *testing.T) { + cmdExec := newMockCommandExecutor() + cmdExec.err = fmt.Errorf("kubectl exec failed: connection refused") + + exec := NewVerifyExecutor(cmdExec, nil, nil, nil) + + task := &domain.WorkTask{ + ID: "task-1", + ProjectID: "project-1", + Type: domain.WorkTaskTypeVerify, + Spec: map[string]any{ + "url": "https://example.com", + }, + } + + result := exec.Execute(context.Background(), task) + + if result.Success { + t.Error("expected failure on capture execution error") + } + if result.Error == "" { + t.Error("expected error message") + } +} + +func TestVerifyExecutor_Execute_NonZeroExitCode(t *testing.T) { + cmdExec := newMockCommandExecutor() + cmdExec.result = &domain.CommandResult{ + ExitCode: 1, + DurationMs: 100, + } + + exec := NewVerifyExecutor(cmdExec, nil, nil, nil) + + task := &domain.WorkTask{ + ID: "task-1", + ProjectID: "project-1", + Type: domain.WorkTaskTypeVerify, + Spec: map[string]any{ + "url": "https://example.com", + }, + } + + result := exec.Execute(context.Background(), task) + + if result.Success { + t.Error("expected failure on non-zero exit code") + } +} + +func TestVerifyExecutor_Execute_InvalidManifestJSON(t *testing.T) { + cmdExec := newMockCommandExecutor() + cmdExec.output = []domain.OutputLine{ + {Stream: "stdout", Line: "not valid json", Timestamp: time.Now()}, + } + + exec := NewVerifyExecutor(cmdExec, nil, nil, nil) + + task := &domain.WorkTask{ + ID: "task-1", + ProjectID: "project-1", + Type: domain.WorkTaskTypeVerify, + Spec: map[string]any{ + "url": "https://example.com", + }, + } + + result := exec.Execute(context.Background(), task) + + if result.Success { + t.Error("expected failure on invalid JSON manifest") + } +} + +func TestVerifyExecutor_ParseSpec(t *testing.T) { + exec := NewVerifyExecutor(nil, nil, nil, nil) + + t.Run("valid spec with all fields", func(t *testing.T) { + spec, err := exec.parseSpec(map[string]any{ + "url": "https://example.com", + "viewports": []any{"1920x1080", "800x600"}, + "wait_for": "#main", + "wait_timeout": float64(5000), + "full_page": true, + "video": true, + "evaluate": true, + "prompt": "Check for hero section", + "callback_url": "https://webhook.example.com/notify", + }) + if err != nil { + t.Fatalf("parseSpec() error = %v", err) + } + if spec.URL != "https://example.com" { + t.Errorf("URL = %q", spec.URL) + } + if len(spec.Viewports) != 2 { + t.Errorf("Viewports count = %d", len(spec.Viewports)) + } + if spec.WaitFor != "#main" { + t.Errorf("WaitFor = %q", spec.WaitFor) + } + if spec.WaitTimeout != 5000 { + t.Errorf("WaitTimeout = %d", spec.WaitTimeout) + } + if !spec.FullPage { + t.Error("expected FullPage = true") + } + if !spec.Video { + t.Error("expected Video = true") + } + if !spec.Evaluate { + t.Error("expected Evaluate = true") + } + if spec.Prompt != "Check for hero section" { + t.Errorf("Prompt = %q", spec.Prompt) + } + if spec.CallbackURL != "https://webhook.example.com/notify" { + t.Errorf("CallbackURL = %q", spec.CallbackURL) + } + }) + + t.Run("minimal spec", func(t *testing.T) { + spec, err := exec.parseSpec(map[string]any{ + "url": "https://example.com", + }) + if err != nil { + t.Fatalf("parseSpec() error = %v", err) + } + if spec.URL != "https://example.com" { + t.Errorf("URL = %q", spec.URL) + } + // Other fields should be zero/empty + if len(spec.Viewports) != 0 { + t.Errorf("expected empty viewports, got %v", spec.Viewports) + } + }) + + t.Run("missing URL", func(t *testing.T) { + _, err := exec.parseSpec(map[string]any{ + "viewports": []any{"1920x1080"}, + }) + if err == nil { + t.Error("expected error for missing URL") + } + }) +} + +func TestVerifyExecutor_BuildCaptureCommand(t *testing.T) { + exec := NewVerifyExecutor(nil, nil, nil, nil) + + spec := &domain.VerifySpec{ + URL: "https://example.com/page", + Viewports: []string{"1920x1080", "375x667"}, + WaitFor: "#app", + FullPage: true, + Video: true, + } + + args := exec.buildCaptureCommand(spec, "/captures/task-123") + + // Check command structure + if args[0] != "node" { + t.Errorf("expected 'node', got %q", args[0]) + } + if args[1] != "/scripts/capture.js" { + t.Errorf("expected '/scripts/capture.js', got %q", args[1]) + } + + // Check URL is included + found := false + for _, arg := range args { + if arg == "--url=https://example.com/page" { + found = true + break + } + } + if !found { + t.Errorf("URL argument not found in %v", args) + } + + // Check viewports + found = false + for _, arg := range args { + if arg == "--viewports=1920x1080,375x667" { + found = true + break + } + } + if !found { + t.Errorf("viewports argument not found in %v", args) + } + + // Check full-page flag + found = false + for _, arg := range args { + if arg == "--full-page=true" { + found = true + break + } + } + if !found { + t.Errorf("full-page argument not found in %v", args) + } + + // Check video flag + found = false + for _, arg := range args { + if arg == "--video=true" { + found = true + break + } + } + if !found { + t.Errorf("video argument not found in %v", args) + } +} + +func TestVerifyExecutor_Config(t *testing.T) { + t.Run("default config", func(t *testing.T) { + exec := NewVerifyExecutor(nil, nil, nil, nil) + if exec.namespace != "rdev" { + t.Errorf("namespace = %q, want 'rdev'", exec.namespace) + } + if exec.podName != "playwright-0" { + t.Errorf("podName = %q, want 'playwright-0'", exec.podName) + } + }) + + t.Run("custom config", func(t *testing.T) { + exec := NewVerifyExecutor(nil, nil, nil, &VerifyExecutorConfig{ + Namespace: "custom-ns", + PodName: "custom-pod-0", + }) + if exec.namespace != "custom-ns" { + t.Errorf("namespace = %q, want 'custom-ns'", exec.namespace) + } + if exec.podName != "custom-pod-0" { + t.Errorf("podName = %q, want 'custom-pod-0'", exec.podName) + } + }) +} diff --git a/internal/worker/work_executor.go b/internal/worker/work_executor.go index fcc65be..17c8188 100644 --- a/internal/worker/work_executor.go +++ b/internal/worker/work_executor.go @@ -260,25 +260,17 @@ func (e *WorkExecutor) tryClaimAndExecute() { ) } } else { - // Fail the task through work service (handles retry logic) - errMsg := result.Error - if errMsg == "" { - errMsg = "execution failed" + // Fail the task through worker service (updates audit + handles retry logic) + if result.Error == "" { + result.Error = "execution failed" } - // Classify the error to enable appropriate client handling - errorCode := domain.ClassifyAgentError(errMsg, result.Output) - - if err := e.workSvc.FailTaskWithCode(e.ctx, task.ID, errMsg, errorCode); err != nil { + if err := e.workerSvc.FailTask(e.ctx, e.workerID, task.ID, result, e.workSvc); err != nil { e.logger.Error("failed to record task failure", "task_id", task.ID, "error", err, ) } - // Return worker to idle regardless - if err := e.workerSvc.Heartbeat(e.ctx, e.workerID); err != nil { - e.logger.Warn("failed to heartbeat after failure", "error", err) - } } }