package worker import ( "context" "encoding/json" "fmt" "log/slog" "strings" "time" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" ) // Verify event type constants for SSE streaming. const ( VerifyEventStarted = "verify.started" VerifyEventCapturing = "verify.capturing" VerifyEventCaptured = "verify.captured" VerifyEventCompleted = "verify.completed" VerifyEventFailed = "verify.failed" ) // VerifyExecutor handles WorkTaskTypeVerify tasks. // It translates VerifySpec fields from the work task's Spec map, // executes the Playwright capture script via kubectl exec, // and returns a BuildResult for queue compatibility. type VerifyExecutor struct { cmdExecutor port.CommandExecutor // kubectl exec wrapper streams port.StreamPublisher // SSE stream publisher for real-time events logger *slog.Logger namespace string // Kubernetes namespace for the pod podName string // Playwright pod name (e.g., "playwright-0") } // VerifyExecutorConfig holds configuration for the verify executor. type VerifyExecutorConfig struct { Namespace string // Kubernetes namespace (e.g., "rdev") PodName string // Playwright pod name (e.g., "playwright-0") } // NewVerifyExecutor creates a new verify executor. func NewVerifyExecutor( cmdExecutor port.CommandExecutor, streams port.StreamPublisher, logger *slog.Logger, cfg *VerifyExecutorConfig, ) *VerifyExecutor { if logger == nil { logger = slog.Default() } if cfg == nil { cfg = &VerifyExecutorConfig{ Namespace: "rdev", PodName: "playwright-0", } } if cfg.Namespace == "" { cfg.Namespace = "rdev" } if cfg.PodName == "" { cfg.PodName = "playwright-0" } return &VerifyExecutor{ cmdExecutor: cmdExecutor, streams: streams, logger: logger.With("component", "verify-executor"), namespace: cfg.Namespace, podName: cfg.PodName, } } // Execute runs a verify task by capturing screenshots/video of a URL. func (v *VerifyExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult { start := time.Now() streamID := task.ID // Use task ID as stream ID for SSE // Publish verify.started event v.publishEvent(streamID, VerifyEventStarted, map[string]any{ "task_id": task.ID, "project_id": task.ProjectID, "started_at": start.Format(time.RFC3339), }) // Parse VerifySpec from task.Spec spec, err := v.parseSpec(task.Spec) if err != nil { v.publishEvent(streamID, VerifyEventFailed, map[string]any{ "task_id": task.ID, "error": fmt.Sprintf("invalid verify spec: %v", err), }) return &domain.BuildResult{ Success: false, Error: fmt.Sprintf("invalid verify spec: %v", err), DurationMs: time.Since(start).Milliseconds(), } } // Apply defaults spec = spec.WithDefaults() // Build output directory using task ID for isolation outputDir := fmt.Sprintf("/captures/%s", task.ID) // Publish capturing event v.publishEvent(streamID, VerifyEventCapturing, map[string]any{ "task_id": task.ID, "url": spec.URL, "viewports": spec.Viewports, }) v.logger.Info("executing verify capture", "task_id", task.ID, "project_id", task.ProjectID, "url", spec.URL, "viewports", spec.Viewports, "pod", v.podName, ) // Build capture command cmdArgs := v.buildCaptureCommand(spec, outputDir) // Execute via CommandExecutor captureOutput, err := v.executeCapture(ctx, task.ID, cmdArgs) if err != nil { v.publishEvent(streamID, VerifyEventFailed, map[string]any{ "task_id": task.ID, "error": fmt.Sprintf("capture execution failed: %v", err), "duration_ms": time.Since(start).Milliseconds(), }) v.closeStream(ctx, streamID) return &domain.BuildResult{ Success: false, Error: fmt.Sprintf("capture execution failed: %v", err), DurationMs: time.Since(start).Milliseconds(), } } // Parse JSON manifest from stdout verifyResult, err := v.parseManifest(captureOutput) if err != nil { v.publishEvent(streamID, VerifyEventFailed, map[string]any{ "task_id": task.ID, "error": fmt.Sprintf("failed to parse capture manifest: %v", err), "duration_ms": time.Since(start).Milliseconds(), }) v.closeStream(ctx, streamID) return &domain.BuildResult{ Success: false, Error: fmt.Sprintf("failed to parse capture manifest: %v", err), DurationMs: time.Since(start).Milliseconds(), } } verifyResult.Success = true verifyResult.DurationMs = time.Since(start).Milliseconds() // Publish captured event v.publishEvent(streamID, VerifyEventCaptured, map[string]any{ "task_id": task.ID, "screenshots": verifyResult.Screenshots, "video": verifyResult.Video, }) // Publish completion event v.publishEvent(streamID, VerifyEventCompleted, map[string]any{ "task_id": task.ID, "success": true, "screenshots": verifyResult.Screenshots, "video": verifyResult.Video, "duration_ms": verifyResult.DurationMs, }) v.closeStream(ctx, streamID) // Convert to BuildResult for queue compatibility return verifyResult.ToBuildResult() } // buildCaptureCommand constructs the node command arguments for capture.js. func (v *VerifyExecutor) buildCaptureCommand(spec *domain.VerifySpec, outputDir string) []string { args := []string{ "node", "/scripts/capture.js", "--url=" + spec.URL, "--output=" + outputDir, "--viewports=" + strings.Join(spec.Viewports, ","), } if spec.WaitFor != "" { args = append(args, "--wait-for="+spec.WaitFor) } if spec.FullPage { args = append(args, "--full-page=true") } if spec.Video { args = append(args, "--video=true") } return args } // executeCapture runs the capture command in the Playwright pod. func (v *VerifyExecutor) executeCapture(ctx context.Context, taskID string, cmdArgs []string) (string, error) { // Create a command to execute cmd := &domain.Command{ ID: domain.CommandID(fmt.Sprintf("verify-%s", taskID)), ProjectID: "", // Verify tasks aren't project-specific Type: domain.CommandTypeShell, Args: cmdArgs, StartedAt: time.Now(), } var outputBuilder strings.Builder // Execute the command and capture output result, err := v.cmdExecutor.Execute(ctx, cmd, v.podName, func(line domain.OutputLine) { // Capture all output for parsing if line.Stream == "stdout" { outputBuilder.WriteString(line.Line) outputBuilder.WriteString("\n") } }) if err != nil { return "", fmt.Errorf("kubectl exec failed: %w", err) } if !result.Success() { return "", fmt.Errorf("capture script failed with exit code %d", result.ExitCode) } return outputBuilder.String(), nil } // parseManifest parses the JSON capture manifest from the script output. func (v *VerifyExecutor) parseManifest(output string) (*domain.VerifyResult, error) { // The capture script outputs JSON to stdout // Find the JSON line (last non-empty line) lines := strings.Split(strings.TrimSpace(output), "\n") if len(lines) == 0 { return nil, fmt.Errorf("no output from capture script") } jsonLine := lines[len(lines)-1] var manifest struct { Screenshots map[string]string `json:"screenshots"` Video string `json:"video,omitempty"` } if err := json.Unmarshal([]byte(jsonLine), &manifest); err != nil { return nil, fmt.Errorf("invalid JSON manifest: %w", err) } return &domain.VerifyResult{ Screenshots: manifest.Screenshots, Video: manifest.Video, }, nil } // parseSpec extracts typed VerifySpec fields from the generic map[string]any. func (v *VerifyExecutor) parseSpec(spec map[string]any) (*domain.VerifySpec, error) { url, _ := spec["url"].(string) if url == "" { return nil, domain.ErrVerifyURLRequired } vs := &domain.VerifySpec{ URL: url, } // Parse optional fields if viewports, ok := spec["viewports"].([]any); ok { for _, vp := range viewports { if s, ok := vp.(string); ok { vs.Viewports = append(vs.Viewports, s) } } } if waitFor, ok := spec["wait_for"].(string); ok { vs.WaitFor = waitFor } if waitTimeout, ok := spec["wait_timeout"].(float64); ok { vs.WaitTimeout = int(waitTimeout) } if fullPage, ok := spec["full_page"].(bool); ok { vs.FullPage = fullPage } if video, ok := spec["video"].(bool); ok { vs.Video = video } if evaluate, ok := spec["evaluate"].(bool); ok { vs.Evaluate = evaluate } if prompt, ok := spec["prompt"].(string); ok { vs.Prompt = prompt } if callbackURL, ok := spec["callback_url"].(string); ok { vs.CallbackURL = callbackURL } // Validate the spec if err := vs.Validate(); err != nil { return nil, err } return vs, nil } // publishEvent publishes an event to the SSE stream if a stream publisher is configured. func (v *VerifyExecutor) publishEvent(streamID, eventType string, data map[string]any) { if v.streams == nil { return } v.streams.Publish(streamID, port.StreamEvent{ Type: eventType, Data: data, }) } // closeStream closes the stream after a delay to allow clients to receive final events. func (v *VerifyExecutor) closeStream(ctx context.Context, streamID string) { if v.streams == nil { return } // Close stream after a short delay to ensure final events are delivered. go func() { select { case <-ctx.Done(): v.streams.Close(streamID) case <-time.After(streamCloseDelay): v.streams.Close(streamID) } }() }