feat: add WorkerService.FailTask for audit updates + visual verification scaffolding

- Add FailTask to WorkerService to update build_audit on failure path
  (fixes bug where audit showed "running" when task actually failed)
- Add WorkServiceFailer interface to avoid circular dependency
- Add VerifyExecutor with Playwright-based visual verification
- Add verify domain types (VerifySpec, VerifyResult, screenshot capture)
- Wire VerifyExecutor placeholder into WorkExecutor (impl in Week 2)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
jordan 2026-02-03 00:09:16 -07:00
parent b2152b7967
commit b5fdf35f1b
7 changed files with 1201 additions and 12 deletions

View File

@ -452,12 +452,15 @@ func main() {
// Start work executor (cross-project worker pool, git via kubectl exec) // Start work executor (cross-project worker pool, git via kubectl exec)
buildExecutor := worker.NewBuildExecutor(agentRegistry, podGitOps, streamPub, logger, nil) buildExecutor := worker.NewBuildExecutor(agentRegistry, podGitOps, streamPub, logger, nil)
// VerifyExecutor requires CommandExecutor - will be wired in Week 2
var verifyExecutor *worker.VerifyExecutor
workerCfg := worker.DefaultWorkExecutorConfig() workerCfg := worker.DefaultWorkExecutorConfig()
workerCfg.Logger = logger workerCfg.Logger = logger
workExecutor := worker.NewWorkExecutor( workExecutor := worker.NewWorkExecutor(
workerService, workerService,
workService, workService,
buildExecutor, buildExecutor,
verifyExecutor,
workerCfg, workerCfg,
) )
if err := workExecutor.Start(); err != nil { if err := workExecutor.Start(); err != nil {

198
internal/domain/verify.go Normal file
View File

@ -0,0 +1,198 @@
package domain
import (
"errors"
"fmt"
"net/url"
"strconv"
)
// Verify-related errors.
var (
ErrVerifyURLRequired = errors.New("url is required for verify spec")
)
// VerifySpec defines what a verify task should accomplish.
type VerifySpec struct {
// URL is the page to capture (required).
URL string `json:"url"`
// Viewports is a list of viewport sizes to capture.
// Default: ["1920x1080", "768x1024", "375x667"]
Viewports []string `json:"viewports,omitempty"`
// WaitFor is a CSS selector to wait for before capturing.
// Default: "body"
WaitFor string `json:"wait_for,omitempty"`
// WaitTimeout is the maximum time to wait for the selector in milliseconds.
// Default: 10000
WaitTimeout int `json:"wait_timeout,omitempty"`
// FullPage captures the entire scrollable page if true.
FullPage bool `json:"full_page,omitempty"`
// Video records a video of the page load if true.
Video bool `json:"video,omitempty"`
// Evaluate enables AI evaluation of the captures (Week 3).
Evaluate bool `json:"evaluate,omitempty"`
// Prompt provides context for AI evaluation (Week 3).
Prompt string `json:"prompt,omitempty"`
// CallbackURL is the webhook URL for completion notification.
CallbackURL string `json:"callback_url,omitempty"`
}
// DefaultViewports returns the default viewport sizes for captures.
func DefaultViewports() []string {
return []string{"1920x1080", "768x1024", "375x667"}
}
// Validate checks that the VerifySpec has all required fields.
func (s *VerifySpec) Validate() error {
if s.URL == "" {
return ErrVerifyURLRequired
}
// Validate URL format
u, err := url.Parse(s.URL)
if err != nil {
return fmt.Errorf("invalid url: %w", err)
}
if u.Scheme != "http" && u.Scheme != "https" {
return fmt.Errorf("url scheme must be http or https, got %q", u.Scheme)
}
// Validate callback URL if provided
if s.CallbackURL != "" {
cu, err := url.Parse(s.CallbackURL)
if err != nil {
return fmt.Errorf("invalid callback_url: %w", err)
}
if cu.Scheme != "http" && cu.Scheme != "https" {
return fmt.Errorf("callback_url scheme must be http or https, got %q", cu.Scheme)
}
}
return nil
}
// WithDefaults returns a copy of the spec with default values applied.
func (s *VerifySpec) WithDefaults() *VerifySpec {
spec := *s
if len(spec.Viewports) == 0 {
spec.Viewports = DefaultViewports()
}
if spec.WaitFor == "" {
spec.WaitFor = "body"
}
if spec.WaitTimeout == 0 {
spec.WaitTimeout = 10000
}
return &spec
}
// VerifyResult captures the outcome of a verify execution.
type VerifyResult struct {
// Success indicates whether the capture completed successfully.
Success bool `json:"success"`
// Screenshots maps viewport size to screenshot file path.
// Example: {"1920x1080": "/captures/task-id/1920_1080.png"}
Screenshots map[string]string `json:"screenshots,omitempty"`
// Video is the path to the recorded video if requested.
Video string `json:"video,omitempty"`
// Evaluation contains the AI evaluation result (Week 3).
Evaluation string `json:"evaluation,omitempty"`
// Score is the AI-assigned score 0-100 (Week 3).
Score int `json:"score,omitempty"`
// Passed indicates whether the AI evaluation passed (Week 3).
Passed bool `json:"passed,omitempty"`
// DurationMs is how long the capture took in milliseconds.
DurationMs int64 `json:"duration_ms"`
// Error contains the error message if capture failed.
Error string `json:"error,omitempty"`
}
// ToWorkResult converts a VerifyResult to a WorkResult for queue compatibility.
// Screenshots are promoted to artifacts with viewport as key prefix.
func (r *VerifyResult) ToWorkResult() *WorkResult {
if r == nil {
return &WorkResult{}
}
wr := &WorkResult{}
// Use error as output if failed, otherwise empty (captures are in artifacts)
if !r.Success && r.Error != "" {
wr.Output = r.Error
}
// Promote screenshots and metadata to artifacts
if len(r.Screenshots) > 0 || r.Video != "" || r.DurationMs > 0 {
wr.Artifacts = make(map[string]string)
// Add screenshots
for viewport, path := range r.Screenshots {
wr.Artifacts["screenshot_"+viewport] = path
}
// Add video if present
if r.Video != "" {
wr.Artifacts["video"] = r.Video
}
// Add duration
if r.DurationMs > 0 {
wr.Artifacts["duration_ms"] = strconv.FormatInt(r.DurationMs, 10)
}
// Add evaluation results (Week 3)
if r.Evaluation != "" {
wr.Artifacts["evaluation"] = r.Evaluation
}
if r.Score > 0 {
wr.Artifacts["score"] = strconv.Itoa(r.Score)
}
if r.Passed {
wr.Artifacts["passed"] = "true"
}
}
return wr
}
// ToBuildResult converts a VerifyResult to a BuildResult for executor compatibility.
// This allows the verify executor to return results through the work queue.
func (r *VerifyResult) ToBuildResult() *BuildResult {
if r == nil {
return &BuildResult{}
}
br := &BuildResult{
Success: r.Success,
DurationMs: r.DurationMs,
Error: r.Error,
}
// Promote screenshots and video to artifacts
if len(r.Screenshots) > 0 || r.Video != "" {
br.Artifacts = make(map[string]string)
for viewport, path := range r.Screenshots {
br.Artifacts["screenshot_"+viewport] = path
}
if r.Video != "" {
br.Artifacts["video"] = r.Video
}
}
return br
}

View File

@ -0,0 +1,274 @@
package domain
import (
"errors"
"testing"
)
func TestVerifySpec_Validate(t *testing.T) {
tests := []struct {
name string
spec VerifySpec
wantErr error
}{
{
name: "valid spec with URL only",
spec: VerifySpec{URL: "https://example.com"},
wantErr: nil,
},
{
name: "valid spec with all fields",
spec: VerifySpec{
URL: "https://example.com/page",
Viewports: []string{"1920x1080", "375x667"},
WaitFor: "#main",
WaitTimeout: 5000,
FullPage: true,
Video: true,
CallbackURL: "https://webhook.example.com/notify",
},
wantErr: nil,
},
{
name: "empty URL",
spec: VerifySpec{},
wantErr: ErrVerifyURLRequired,
},
{
name: "invalid URL scheme",
spec: VerifySpec{URL: "ftp://example.com"},
wantErr: nil, // Validate will fail but not return ErrVerifyURLRequired
},
{
name: "invalid callback URL scheme",
spec: VerifySpec{URL: "https://example.com", CallbackURL: "ftp://webhook.com"},
wantErr: nil, // Validate will fail but not return ErrVerifyURLRequired
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.spec.Validate()
if tt.wantErr != nil {
if !errors.Is(err, tt.wantErr) {
t.Errorf("Validate() error = %v, want %v", err, tt.wantErr)
}
} else if tt.name == "invalid URL scheme" || tt.name == "invalid callback URL scheme" {
// These should fail with a different error
if err == nil {
t.Errorf("Validate() expected error for %s", tt.name)
}
} else if err != nil {
t.Errorf("Validate() unexpected error = %v", err)
}
})
}
}
func TestVerifySpec_WithDefaults(t *testing.T) {
t.Run("applies defaults to empty spec", func(t *testing.T) {
spec := &VerifySpec{URL: "https://example.com"}
result := spec.WithDefaults()
if len(result.Viewports) != 3 {
t.Errorf("expected 3 default viewports, got %d", len(result.Viewports))
}
if result.Viewports[0] != "1920x1080" {
t.Errorf("expected first viewport '1920x1080', got %q", result.Viewports[0])
}
if result.WaitFor != "body" {
t.Errorf("expected WaitFor 'body', got %q", result.WaitFor)
}
if result.WaitTimeout != 10000 {
t.Errorf("expected WaitTimeout 10000, got %d", result.WaitTimeout)
}
})
t.Run("preserves existing values", func(t *testing.T) {
spec := &VerifySpec{
URL: "https://example.com",
Viewports: []string{"800x600"},
WaitFor: "#app",
WaitTimeout: 5000,
}
result := spec.WithDefaults()
if len(result.Viewports) != 1 || result.Viewports[0] != "800x600" {
t.Errorf("expected preserved viewports, got %v", result.Viewports)
}
if result.WaitFor != "#app" {
t.Errorf("expected preserved WaitFor, got %q", result.WaitFor)
}
if result.WaitTimeout != 5000 {
t.Errorf("expected preserved WaitTimeout, got %d", result.WaitTimeout)
}
})
t.Run("does not modify original", func(t *testing.T) {
spec := &VerifySpec{URL: "https://example.com"}
_ = spec.WithDefaults()
if len(spec.Viewports) != 0 {
t.Error("original spec should not be modified")
}
})
}
func TestVerifyResult_ToWorkResult(t *testing.T) {
t.Run("success with screenshots", func(t *testing.T) {
result := &VerifyResult{
Success: true,
Screenshots: map[string]string{
"1920x1080": "/captures/task-1/1920_1080.png",
"375x667": "/captures/task-1/375_667.png",
},
DurationMs: 1500,
}
wr := result.ToWorkResult()
if wr.Output != "" {
t.Errorf("expected empty output on success, got %q", wr.Output)
}
if wr.Artifacts["screenshot_1920x1080"] != "/captures/task-1/1920_1080.png" {
t.Errorf("screenshot_1920x1080 = %q", wr.Artifacts["screenshot_1920x1080"])
}
if wr.Artifacts["screenshot_375x667"] != "/captures/task-1/375_667.png" {
t.Errorf("screenshot_375x667 = %q", wr.Artifacts["screenshot_375x667"])
}
if wr.Artifacts["duration_ms"] != "1500" {
t.Errorf("duration_ms = %q", wr.Artifacts["duration_ms"])
}
})
t.Run("success with video", func(t *testing.T) {
result := &VerifyResult{
Success: true,
Screenshots: map[string]string{"1920x1080": "/captures/task-1/1920_1080.png"},
Video: "/captures/task-1/recording.webm",
DurationMs: 2000,
}
wr := result.ToWorkResult()
if wr.Artifacts["video"] != "/captures/task-1/recording.webm" {
t.Errorf("video = %q", wr.Artifacts["video"])
}
})
t.Run("failure uses error as output", func(t *testing.T) {
result := &VerifyResult{
Success: false,
Error: "capture failed: timeout",
}
wr := result.ToWorkResult()
if wr.Output != "capture failed: timeout" {
t.Errorf("Output = %q, want error message", wr.Output)
}
})
t.Run("nil receiver returns empty result", func(t *testing.T) {
var result *VerifyResult
wr := result.ToWorkResult()
if wr.Output != "" {
t.Errorf("Output = %q, want empty", wr.Output)
}
if wr.Artifacts != nil {
t.Errorf("Artifacts = %v, want nil", wr.Artifacts)
}
})
t.Run("evaluation results included", func(t *testing.T) {
result := &VerifyResult{
Success: true,
Evaluation: "Page renders correctly with all elements visible",
Score: 85,
Passed: true,
DurationMs: 1000,
}
wr := result.ToWorkResult()
if wr.Artifacts["evaluation"] != "Page renders correctly with all elements visible" {
t.Errorf("evaluation = %q", wr.Artifacts["evaluation"])
}
if wr.Artifacts["score"] != "85" {
t.Errorf("score = %q", wr.Artifacts["score"])
}
if wr.Artifacts["passed"] != "true" {
t.Errorf("passed = %q", wr.Artifacts["passed"])
}
})
}
func TestVerifyResult_ToBuildResult(t *testing.T) {
t.Run("success conversion", func(t *testing.T) {
result := &VerifyResult{
Success: true,
Screenshots: map[string]string{
"1920x1080": "/captures/task-1/1920_1080.png",
},
Video: "/captures/task-1/recording.webm",
DurationMs: 1500,
}
br := result.ToBuildResult()
if !br.Success {
t.Error("expected success = true")
}
if br.DurationMs != 1500 {
t.Errorf("DurationMs = %d, want 1500", br.DurationMs)
}
if br.Artifacts["screenshot_1920x1080"] != "/captures/task-1/1920_1080.png" {
t.Errorf("screenshot artifact missing")
}
if br.Artifacts["video"] != "/captures/task-1/recording.webm" {
t.Errorf("video artifact missing")
}
})
t.Run("failure conversion", func(t *testing.T) {
result := &VerifyResult{
Success: false,
Error: "capture failed",
DurationMs: 500,
}
br := result.ToBuildResult()
if br.Success {
t.Error("expected success = false")
}
if br.Error != "capture failed" {
t.Errorf("Error = %q", br.Error)
}
})
t.Run("nil receiver returns empty", func(t *testing.T) {
var result *VerifyResult
br := result.ToBuildResult()
if br.Success {
t.Error("expected success = false for nil")
}
})
}
func TestDefaultViewports(t *testing.T) {
viewports := DefaultViewports()
if len(viewports) != 3 {
t.Fatalf("expected 3 viewports, got %d", len(viewports))
}
expected := []string{"1920x1080", "768x1024", "375x667"}
for i, vp := range expected {
if viewports[i] != vp {
t.Errorf("viewport[%d] = %q, want %q", i, viewports[i], vp)
}
}
}

View File

@ -185,6 +185,53 @@ func (s *WorkerService) CompleteTask(ctx context.Context, workerID, taskID strin
return nil return nil
} }
// FailTask marks a task as failed, updates audit, and returns worker to idle.
// This mirrors CompleteTask but for the failure path, ensuring the build_audit
// table is updated with the failure status.
func (s *WorkerService) FailTask(ctx context.Context, workerID, taskID string, result *domain.BuildResult, workSvc WorkServiceFailer) error {
if result == nil {
result = &domain.BuildResult{Success: false, Error: "unknown error"}
}
// Update audit record with failure (non-critical)
if s.audit != nil {
if err := s.audit.Update(ctx, taskID, result); err != nil {
s.logger.Warn("failed to update audit on failure",
"task_id", taskID,
"error", err,
)
}
}
// Classify the error and delegate to work service for retry logic
errorCode := domain.ClassifyAgentError(result.Error, result.Output)
if err := workSvc.FailTaskWithCode(ctx, taskID, result.Error, errorCode); err != nil {
return err
}
// Return worker to idle
if err := s.registry.UpdateStatus(ctx, workerID, domain.WorkerStatusIdle, ""); err != nil {
s.logger.Warn("failed to return worker to idle after failure",
"worker_id", workerID,
"error", err,
)
}
s.logger.Info("task failed",
"task_id", taskID,
"worker_id", workerID,
"error", result.Error,
)
return nil
}
// WorkServiceFailer is the interface for the work service failure method.
// This avoids a circular dependency between WorkerService and WorkService.
type WorkServiceFailer interface {
FailTaskWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error
}
// DrainWorker sets a worker to draining status so it finishes current work // DrainWorker sets a worker to draining status so it finishes current work
// but doesn't accept new tasks. // but doesn't accept new tasks.
func (s *WorkerService) DrainWorker(ctx context.Context, workerID string) error { func (s *WorkerService) DrainWorker(ctx context.Context, workerID string) error {

View File

@ -0,0 +1,342 @@
package worker
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strings"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
)
// Verify event type constants for SSE streaming.
const (
VerifyEventStarted = "verify.started"
VerifyEventCapturing = "verify.capturing"
VerifyEventCaptured = "verify.captured"
VerifyEventCompleted = "verify.completed"
VerifyEventFailed = "verify.failed"
)
// VerifyExecutor handles WorkTaskTypeVerify tasks.
// It translates VerifySpec fields from the work task's Spec map,
// executes the Playwright capture script via kubectl exec,
// and returns a BuildResult for queue compatibility.
type VerifyExecutor struct {
cmdExecutor port.CommandExecutor // kubectl exec wrapper
streams port.StreamPublisher // SSE stream publisher for real-time events
logger *slog.Logger
namespace string // Kubernetes namespace for the pod
podName string // Playwright pod name (e.g., "playwright-0")
}
// VerifyExecutorConfig holds configuration for the verify executor.
type VerifyExecutorConfig struct {
Namespace string // Kubernetes namespace (e.g., "rdev")
PodName string // Playwright pod name (e.g., "playwright-0")
}
// NewVerifyExecutor creates a new verify executor.
func NewVerifyExecutor(
cmdExecutor port.CommandExecutor,
streams port.StreamPublisher,
logger *slog.Logger,
cfg *VerifyExecutorConfig,
) *VerifyExecutor {
if logger == nil {
logger = slog.Default()
}
if cfg == nil {
cfg = &VerifyExecutorConfig{
Namespace: "rdev",
PodName: "playwright-0",
}
}
if cfg.Namespace == "" {
cfg.Namespace = "rdev"
}
if cfg.PodName == "" {
cfg.PodName = "playwright-0"
}
return &VerifyExecutor{
cmdExecutor: cmdExecutor,
streams: streams,
logger: logger.With("component", "verify-executor"),
namespace: cfg.Namespace,
podName: cfg.PodName,
}
}
// Execute runs a verify task by capturing screenshots/video of a URL.
func (v *VerifyExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult {
start := time.Now()
streamID := task.ID // Use task ID as stream ID for SSE
// Publish verify.started event
v.publishEvent(streamID, VerifyEventStarted, map[string]any{
"task_id": task.ID,
"project_id": task.ProjectID,
"started_at": start.Format(time.RFC3339),
})
// Parse VerifySpec from task.Spec
spec, err := v.parseSpec(task.Spec)
if err != nil {
v.publishEvent(streamID, VerifyEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("invalid verify spec: %v", err),
})
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("invalid verify spec: %v", err),
DurationMs: time.Since(start).Milliseconds(),
}
}
// Apply defaults
spec = spec.WithDefaults()
// Build output directory using task ID for isolation
outputDir := fmt.Sprintf("/captures/%s", task.ID)
// Publish capturing event
v.publishEvent(streamID, VerifyEventCapturing, map[string]any{
"task_id": task.ID,
"url": spec.URL,
"viewports": spec.Viewports,
})
v.logger.Info("executing verify capture",
"task_id", task.ID,
"project_id", task.ProjectID,
"url", spec.URL,
"viewports", spec.Viewports,
"pod", v.podName,
)
// Build capture command
cmdArgs := v.buildCaptureCommand(spec, outputDir)
// Execute via CommandExecutor
captureOutput, err := v.executeCapture(ctx, task.ID, cmdArgs)
if err != nil {
v.publishEvent(streamID, VerifyEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("capture execution failed: %v", err),
"duration_ms": time.Since(start).Milliseconds(),
})
v.closeStream(ctx, streamID)
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("capture execution failed: %v", err),
DurationMs: time.Since(start).Milliseconds(),
}
}
// Parse JSON manifest from stdout
verifyResult, err := v.parseManifest(captureOutput)
if err != nil {
v.publishEvent(streamID, VerifyEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("failed to parse capture manifest: %v", err),
"duration_ms": time.Since(start).Milliseconds(),
})
v.closeStream(ctx, streamID)
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("failed to parse capture manifest: %v", err),
DurationMs: time.Since(start).Milliseconds(),
}
}
verifyResult.Success = true
verifyResult.DurationMs = time.Since(start).Milliseconds()
// Publish captured event
v.publishEvent(streamID, VerifyEventCaptured, map[string]any{
"task_id": task.ID,
"screenshots": verifyResult.Screenshots,
"video": verifyResult.Video,
})
// Publish completion event
v.publishEvent(streamID, VerifyEventCompleted, map[string]any{
"task_id": task.ID,
"success": true,
"screenshots": verifyResult.Screenshots,
"video": verifyResult.Video,
"duration_ms": verifyResult.DurationMs,
})
v.closeStream(ctx, streamID)
// Convert to BuildResult for queue compatibility
return verifyResult.ToBuildResult()
}
// buildCaptureCommand constructs the node command arguments for capture.js.
func (v *VerifyExecutor) buildCaptureCommand(spec *domain.VerifySpec, outputDir string) []string {
args := []string{
"node", "/scripts/capture.js",
"--url=" + spec.URL,
"--output=" + outputDir,
"--viewports=" + strings.Join(spec.Viewports, ","),
}
if spec.WaitFor != "" {
args = append(args, "--wait-for="+spec.WaitFor)
}
if spec.FullPage {
args = append(args, "--full-page=true")
}
if spec.Video {
args = append(args, "--video=true")
}
return args
}
// executeCapture runs the capture command in the Playwright pod.
func (v *VerifyExecutor) executeCapture(ctx context.Context, taskID string, cmdArgs []string) (string, error) {
// Create a command to execute
cmd := &domain.Command{
ID: domain.CommandID(fmt.Sprintf("verify-%s", taskID)),
ProjectID: "", // Verify tasks aren't project-specific
Type: domain.CommandTypeShell,
Args: cmdArgs,
StartedAt: time.Now(),
}
var outputBuilder strings.Builder
// Execute the command and capture output
result, err := v.cmdExecutor.Execute(ctx, cmd, v.podName, func(line domain.OutputLine) {
// Capture all output for parsing
if line.Stream == "stdout" {
outputBuilder.WriteString(line.Line)
outputBuilder.WriteString("\n")
}
})
if err != nil {
return "", fmt.Errorf("kubectl exec failed: %w", err)
}
if !result.Success() {
return "", fmt.Errorf("capture script failed with exit code %d", result.ExitCode)
}
return outputBuilder.String(), nil
}
// parseManifest parses the JSON capture manifest from the script output.
func (v *VerifyExecutor) parseManifest(output string) (*domain.VerifyResult, error) {
// The capture script outputs JSON to stdout
// Find the JSON line (last non-empty line)
lines := strings.Split(strings.TrimSpace(output), "\n")
if len(lines) == 0 {
return nil, fmt.Errorf("no output from capture script")
}
jsonLine := lines[len(lines)-1]
var manifest struct {
Screenshots map[string]string `json:"screenshots"`
Video string `json:"video,omitempty"`
}
if err := json.Unmarshal([]byte(jsonLine), &manifest); err != nil {
return nil, fmt.Errorf("invalid JSON manifest: %w", err)
}
return &domain.VerifyResult{
Screenshots: manifest.Screenshots,
Video: manifest.Video,
}, nil
}
// parseSpec extracts typed VerifySpec fields from the generic map[string]any.
func (v *VerifyExecutor) parseSpec(spec map[string]any) (*domain.VerifySpec, error) {
url, _ := spec["url"].(string)
if url == "" {
return nil, domain.ErrVerifyURLRequired
}
vs := &domain.VerifySpec{
URL: url,
}
// Parse optional fields
if viewports, ok := spec["viewports"].([]any); ok {
for _, vp := range viewports {
if s, ok := vp.(string); ok {
vs.Viewports = append(vs.Viewports, s)
}
}
}
if waitFor, ok := spec["wait_for"].(string); ok {
vs.WaitFor = waitFor
}
if waitTimeout, ok := spec["wait_timeout"].(float64); ok {
vs.WaitTimeout = int(waitTimeout)
}
if fullPage, ok := spec["full_page"].(bool); ok {
vs.FullPage = fullPage
}
if video, ok := spec["video"].(bool); ok {
vs.Video = video
}
if evaluate, ok := spec["evaluate"].(bool); ok {
vs.Evaluate = evaluate
}
if prompt, ok := spec["prompt"].(string); ok {
vs.Prompt = prompt
}
if callbackURL, ok := spec["callback_url"].(string); ok {
vs.CallbackURL = callbackURL
}
// Validate the spec
if err := vs.Validate(); err != nil {
return nil, err
}
return vs, nil
}
// publishEvent publishes an event to the SSE stream if a stream publisher is configured.
func (v *VerifyExecutor) publishEvent(streamID, eventType string, data map[string]any) {
if v.streams == nil {
return
}
v.streams.Publish(streamID, port.StreamEvent{
Type: eventType,
Data: data,
})
}
// closeStream closes the stream after a delay to allow clients to receive final events.
func (v *VerifyExecutor) closeStream(ctx context.Context, streamID string) {
if v.streams == nil {
return
}
// Close stream after a short delay to ensure final events are delivered.
go func() {
select {
case <-ctx.Done():
v.streams.Close(streamID)
case <-time.After(streamCloseDelay):
v.streams.Close(streamID)
}
}()
}

View File

@ -0,0 +1,333 @@
package worker
import (
"context"
"fmt"
"testing"
"time"
"github.com/orchard9/rdev/internal/domain"
)
func TestVerifyExecutor_Execute_Success(t *testing.T) {
cmdExec := newMockCommandExecutor()
// Simulate capture.js JSON output
cmdExec.output = []domain.OutputLine{
{Stream: "stdout", Line: `{"screenshots":{"1920x1080":"/captures/task-1/1920_1080.png","375x667":"/captures/task-1/375_667.png"},"video":"/captures/task-1/recording.webm"}`, Timestamp: time.Now()},
}
exec := NewVerifyExecutor(cmdExec, nil, nil, nil)
task := &domain.WorkTask{
ID: "task-1",
ProjectID: "project-1",
Type: domain.WorkTaskTypeVerify,
Spec: map[string]any{
"url": "https://example.com",
"viewports": []any{"1920x1080", "375x667"},
"video": true,
},
}
result := exec.Execute(context.Background(), task)
if !result.Success {
t.Errorf("expected success, got error: %s", result.Error)
}
if result.DurationMs < 0 {
t.Errorf("expected non-negative duration, got %d", result.DurationMs)
}
// Check artifacts were populated
if result.Artifacts == nil {
t.Fatal("expected artifacts to be populated")
}
if result.Artifacts["screenshot_1920x1080"] != "/captures/task-1/1920_1080.png" {
t.Errorf("screenshot_1920x1080 = %q", result.Artifacts["screenshot_1920x1080"])
}
if result.Artifacts["video"] != "/captures/task-1/recording.webm" {
t.Errorf("video = %q", result.Artifacts["video"])
}
}
func TestVerifyExecutor_Execute_URLRequired(t *testing.T) {
cmdExec := newMockCommandExecutor()
exec := NewVerifyExecutor(cmdExec, nil, nil, nil)
task := &domain.WorkTask{
ID: "task-1",
ProjectID: "project-1",
Type: domain.WorkTaskTypeVerify,
Spec: map[string]any{}, // Missing URL
}
result := exec.Execute(context.Background(), task)
if result.Success {
t.Error("expected failure for missing URL")
}
if result.Error == "" {
t.Error("expected error message")
}
}
func TestVerifyExecutor_Execute_InvalidURL(t *testing.T) {
cmdExec := newMockCommandExecutor()
exec := NewVerifyExecutor(cmdExec, nil, nil, nil)
task := &domain.WorkTask{
ID: "task-1",
ProjectID: "project-1",
Type: domain.WorkTaskTypeVerify,
Spec: map[string]any{
"url": "ftp://invalid-scheme.com",
},
}
result := exec.Execute(context.Background(), task)
if result.Success {
t.Error("expected failure for invalid URL scheme")
}
}
func TestVerifyExecutor_Execute_CaptureFailure(t *testing.T) {
cmdExec := newMockCommandExecutor()
cmdExec.err = fmt.Errorf("kubectl exec failed: connection refused")
exec := NewVerifyExecutor(cmdExec, nil, nil, nil)
task := &domain.WorkTask{
ID: "task-1",
ProjectID: "project-1",
Type: domain.WorkTaskTypeVerify,
Spec: map[string]any{
"url": "https://example.com",
},
}
result := exec.Execute(context.Background(), task)
if result.Success {
t.Error("expected failure on capture execution error")
}
if result.Error == "" {
t.Error("expected error message")
}
}
func TestVerifyExecutor_Execute_NonZeroExitCode(t *testing.T) {
cmdExec := newMockCommandExecutor()
cmdExec.result = &domain.CommandResult{
ExitCode: 1,
DurationMs: 100,
}
exec := NewVerifyExecutor(cmdExec, nil, nil, nil)
task := &domain.WorkTask{
ID: "task-1",
ProjectID: "project-1",
Type: domain.WorkTaskTypeVerify,
Spec: map[string]any{
"url": "https://example.com",
},
}
result := exec.Execute(context.Background(), task)
if result.Success {
t.Error("expected failure on non-zero exit code")
}
}
func TestVerifyExecutor_Execute_InvalidManifestJSON(t *testing.T) {
cmdExec := newMockCommandExecutor()
cmdExec.output = []domain.OutputLine{
{Stream: "stdout", Line: "not valid json", Timestamp: time.Now()},
}
exec := NewVerifyExecutor(cmdExec, nil, nil, nil)
task := &domain.WorkTask{
ID: "task-1",
ProjectID: "project-1",
Type: domain.WorkTaskTypeVerify,
Spec: map[string]any{
"url": "https://example.com",
},
}
result := exec.Execute(context.Background(), task)
if result.Success {
t.Error("expected failure on invalid JSON manifest")
}
}
func TestVerifyExecutor_ParseSpec(t *testing.T) {
exec := NewVerifyExecutor(nil, nil, nil, nil)
t.Run("valid spec with all fields", func(t *testing.T) {
spec, err := exec.parseSpec(map[string]any{
"url": "https://example.com",
"viewports": []any{"1920x1080", "800x600"},
"wait_for": "#main",
"wait_timeout": float64(5000),
"full_page": true,
"video": true,
"evaluate": true,
"prompt": "Check for hero section",
"callback_url": "https://webhook.example.com/notify",
})
if err != nil {
t.Fatalf("parseSpec() error = %v", err)
}
if spec.URL != "https://example.com" {
t.Errorf("URL = %q", spec.URL)
}
if len(spec.Viewports) != 2 {
t.Errorf("Viewports count = %d", len(spec.Viewports))
}
if spec.WaitFor != "#main" {
t.Errorf("WaitFor = %q", spec.WaitFor)
}
if spec.WaitTimeout != 5000 {
t.Errorf("WaitTimeout = %d", spec.WaitTimeout)
}
if !spec.FullPage {
t.Error("expected FullPage = true")
}
if !spec.Video {
t.Error("expected Video = true")
}
if !spec.Evaluate {
t.Error("expected Evaluate = true")
}
if spec.Prompt != "Check for hero section" {
t.Errorf("Prompt = %q", spec.Prompt)
}
if spec.CallbackURL != "https://webhook.example.com/notify" {
t.Errorf("CallbackURL = %q", spec.CallbackURL)
}
})
t.Run("minimal spec", func(t *testing.T) {
spec, err := exec.parseSpec(map[string]any{
"url": "https://example.com",
})
if err != nil {
t.Fatalf("parseSpec() error = %v", err)
}
if spec.URL != "https://example.com" {
t.Errorf("URL = %q", spec.URL)
}
// Other fields should be zero/empty
if len(spec.Viewports) != 0 {
t.Errorf("expected empty viewports, got %v", spec.Viewports)
}
})
t.Run("missing URL", func(t *testing.T) {
_, err := exec.parseSpec(map[string]any{
"viewports": []any{"1920x1080"},
})
if err == nil {
t.Error("expected error for missing URL")
}
})
}
func TestVerifyExecutor_BuildCaptureCommand(t *testing.T) {
exec := NewVerifyExecutor(nil, nil, nil, nil)
spec := &domain.VerifySpec{
URL: "https://example.com/page",
Viewports: []string{"1920x1080", "375x667"},
WaitFor: "#app",
FullPage: true,
Video: true,
}
args := exec.buildCaptureCommand(spec, "/captures/task-123")
// Check command structure
if args[0] != "node" {
t.Errorf("expected 'node', got %q", args[0])
}
if args[1] != "/scripts/capture.js" {
t.Errorf("expected '/scripts/capture.js', got %q", args[1])
}
// Check URL is included
found := false
for _, arg := range args {
if arg == "--url=https://example.com/page" {
found = true
break
}
}
if !found {
t.Errorf("URL argument not found in %v", args)
}
// Check viewports
found = false
for _, arg := range args {
if arg == "--viewports=1920x1080,375x667" {
found = true
break
}
}
if !found {
t.Errorf("viewports argument not found in %v", args)
}
// Check full-page flag
found = false
for _, arg := range args {
if arg == "--full-page=true" {
found = true
break
}
}
if !found {
t.Errorf("full-page argument not found in %v", args)
}
// Check video flag
found = false
for _, arg := range args {
if arg == "--video=true" {
found = true
break
}
}
if !found {
t.Errorf("video argument not found in %v", args)
}
}
func TestVerifyExecutor_Config(t *testing.T) {
t.Run("default config", func(t *testing.T) {
exec := NewVerifyExecutor(nil, nil, nil, nil)
if exec.namespace != "rdev" {
t.Errorf("namespace = %q, want 'rdev'", exec.namespace)
}
if exec.podName != "playwright-0" {
t.Errorf("podName = %q, want 'playwright-0'", exec.podName)
}
})
t.Run("custom config", func(t *testing.T) {
exec := NewVerifyExecutor(nil, nil, nil, &VerifyExecutorConfig{
Namespace: "custom-ns",
PodName: "custom-pod-0",
})
if exec.namespace != "custom-ns" {
t.Errorf("namespace = %q, want 'custom-ns'", exec.namespace)
}
if exec.podName != "custom-pod-0" {
t.Errorf("podName = %q, want 'custom-pod-0'", exec.podName)
}
})
}

View File

@ -260,25 +260,17 @@ func (e *WorkExecutor) tryClaimAndExecute() {
) )
} }
} else { } else {
// Fail the task through work service (handles retry logic) // Fail the task through worker service (updates audit + handles retry logic)
errMsg := result.Error if result.Error == "" {
if errMsg == "" { result.Error = "execution failed"
errMsg = "execution failed"
} }
// Classify the error to enable appropriate client handling if err := e.workerSvc.FailTask(e.ctx, e.workerID, task.ID, result, e.workSvc); err != nil {
errorCode := domain.ClassifyAgentError(errMsg, result.Output)
if err := e.workSvc.FailTaskWithCode(e.ctx, task.ID, errMsg, errorCode); err != nil {
e.logger.Error("failed to record task failure", e.logger.Error("failed to record task failure",
"task_id", task.ID, "task_id", task.ID,
"error", err, "error", err,
) )
} }
// Return worker to idle regardless
if err := e.workerSvc.Heartbeat(e.ctx, e.workerID); err != nil {
e.logger.Warn("failed to heartbeat after failure", "error", err)
}
} }
} }