rdev/internal/worker/verify_executor.go
jordan 53862c773b fix: resolve systemic debt in worker and skeleton templates
Worker template fixes:
- Replace panic() with logger.Error() + os.Exit(1) for config errors
- Remove double-timeout application (context + middleware)
- Add error message truncation to prevent log bloat
- Use named constants for shutdown grace period and stale check interval

Skeleton pkg/auth fixes:
- Fix error wrapping to use %w consistently in jwt.go
- Add GetUserOrError() as safe alternative to MustGetUser() panic

Skeleton pkg/queue fixes:
- Check RowsAffected() errors instead of ignoring them
- Add input validation to EnqueueWithOptions (require job type, cap retries)
- Add log truncation for error messages
- Fix inaccurate doc comment claiming exponential backoff

Worker timeout consolidation:
- Add internal/worker/timeouts.go with named constants
- Migrate all workers to use timeout constants

Cleanup:
- Remove obsolete slack-preparation-thoughts.md files

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 23:44:55 -07:00

338 lines
9.1 KiB
Go

package worker
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port"
)
// Verify event type constants for SSE streaming.
const (
VerifyEventStarted = "verify.started"
VerifyEventCapturing = "verify.capturing"
VerifyEventCaptured = "verify.captured"
VerifyEventCompleted = "verify.completed"
VerifyEventFailed = "verify.failed"
)
// VerifyExecutor handles WorkTaskTypeVerify tasks.
// It translates VerifySpec fields from the work task's Spec map,
// executes the Playwright capture script via kubectl exec,
// and returns a BuildResult for queue compatibility.
type VerifyExecutor struct {
cmdExecutor port.CommandExecutor // kubectl exec wrapper
streams port.StreamPublisher // SSE stream publisher for real-time events
namespace string // Kubernetes namespace for the pod
podName string // Playwright pod name (e.g., "playwright-0")
}
// VerifyExecutorConfig holds configuration for the verify executor.
type VerifyExecutorConfig struct {
Namespace string // Kubernetes namespace (e.g., "rdev")
PodName string // Playwright pod name (e.g., "playwright-0")
}
// NewVerifyExecutor creates a new verify executor.
func NewVerifyExecutor(
cmdExecutor port.CommandExecutor,
streams port.StreamPublisher,
cfg *VerifyExecutorConfig,
) *VerifyExecutor {
if cfg == nil {
cfg = &VerifyExecutorConfig{
Namespace: "rdev",
PodName: "playwright-0",
}
}
if cfg.Namespace == "" {
cfg.Namespace = "rdev"
}
if cfg.PodName == "" {
cfg.PodName = "playwright-0"
}
return &VerifyExecutor{
cmdExecutor: cmdExecutor,
streams: streams,
namespace: cfg.Namespace,
podName: cfg.PodName,
}
}
// Execute runs a verify task by capturing screenshots/video of a URL.
func (v *VerifyExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult {
log := logging.FromContext(ctx).WithWorker("verify-executor")
start := time.Now()
streamID := task.ID // Use task ID as stream ID for SSE
// Publish verify.started event
v.publishEvent(streamID, VerifyEventStarted, map[string]any{
"task_id": task.ID,
"project_id": task.ProjectID,
"started_at": start.Format(time.RFC3339),
})
// Parse VerifySpec from task.Spec
spec, err := v.parseSpec(task.Spec)
if err != nil {
v.publishEvent(streamID, VerifyEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("invalid verify spec: %v", err),
})
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("invalid verify spec: %v", err),
DurationMs: time.Since(start).Milliseconds(),
}
}
// Apply defaults
spec = spec.WithDefaults()
// Build output directory using task ID for isolation
outputDir := fmt.Sprintf("/captures/%s", task.ID)
// Publish capturing event
v.publishEvent(streamID, VerifyEventCapturing, map[string]any{
"task_id": task.ID,
"url": spec.URL,
"viewports": spec.Viewports,
})
log.Info("executing verify capture",
"task_id", task.ID,
"project_id", task.ProjectID,
"url", spec.URL,
"viewports", spec.Viewports,
"pod", v.podName,
)
// Build capture command
cmdArgs := v.buildCaptureCommand(spec, outputDir)
// Execute via CommandExecutor
captureOutput, err := v.executeCapture(ctx, task.ID, cmdArgs)
if err != nil {
v.publishEvent(streamID, VerifyEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("capture execution failed: %v", err),
"duration_ms": time.Since(start).Milliseconds(),
})
v.closeStream(ctx, streamID)
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("capture execution failed: %v", err),
DurationMs: time.Since(start).Milliseconds(),
}
}
// Parse JSON manifest from stdout
verifyResult, err := v.parseManifest(captureOutput)
if err != nil {
v.publishEvent(streamID, VerifyEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("failed to parse capture manifest: %v", err),
"duration_ms": time.Since(start).Milliseconds(),
})
v.closeStream(ctx, streamID)
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("failed to parse capture manifest: %v", err),
DurationMs: time.Since(start).Milliseconds(),
}
}
verifyResult.Success = true
verifyResult.DurationMs = time.Since(start).Milliseconds()
// Publish captured event
v.publishEvent(streamID, VerifyEventCaptured, map[string]any{
"task_id": task.ID,
"screenshots": verifyResult.Screenshots,
"video": verifyResult.Video,
})
// Publish completion event
v.publishEvent(streamID, VerifyEventCompleted, map[string]any{
"task_id": task.ID,
"success": true,
"screenshots": verifyResult.Screenshots,
"video": verifyResult.Video,
"duration_ms": verifyResult.DurationMs,
})
v.closeStream(ctx, streamID)
// Convert to BuildResult for queue compatibility
return verifyResult.ToBuildResult()
}
// buildCaptureCommand constructs the node command arguments for capture.js.
func (v *VerifyExecutor) buildCaptureCommand(spec *domain.VerifySpec, outputDir string) []string {
args := []string{
"node", "/scripts/capture.js",
"--url=" + spec.URL,
"--output=" + outputDir,
"--viewports=" + strings.Join(spec.Viewports, ","),
}
if spec.WaitFor != "" {
args = append(args, "--wait-for="+spec.WaitFor)
}
if spec.FullPage {
args = append(args, "--full-page=true")
}
if spec.Video {
args = append(args, "--video=true")
}
return args
}
// executeCapture runs the capture command in the Playwright pod.
func (v *VerifyExecutor) executeCapture(ctx context.Context, taskID string, cmdArgs []string) (string, error) {
// Create a command to execute
cmd := &domain.Command{
ID: domain.CommandID(fmt.Sprintf("verify-%s", taskID)),
ProjectID: "", // Verify tasks aren't project-specific
Type: domain.CommandTypeShell,
Args: cmdArgs,
StartedAt: time.Now(),
}
var outputBuilder strings.Builder
// Execute the command and capture output
result, err := v.cmdExecutor.Execute(ctx, cmd, v.podName, func(line domain.OutputLine) {
// Capture all output for parsing
if line.Stream == "stdout" {
outputBuilder.WriteString(line.Line)
outputBuilder.WriteString("\n")
}
})
if err != nil {
return "", fmt.Errorf("kubectl exec failed: %w", err)
}
if !result.Success() {
return "", fmt.Errorf("capture script failed with exit code %d", result.ExitCode)
}
return outputBuilder.String(), nil
}
// parseManifest parses the JSON capture manifest from the script output.
func (v *VerifyExecutor) parseManifest(output string) (*domain.VerifyResult, error) {
// The capture script outputs JSON to stdout
// Find the JSON line (last non-empty line)
lines := strings.Split(strings.TrimSpace(output), "\n")
if len(lines) == 0 {
return nil, fmt.Errorf("no output from capture script")
}
jsonLine := lines[len(lines)-1]
var manifest struct {
Screenshots map[string]string `json:"screenshots"`
Video string `json:"video,omitempty"`
}
if err := json.Unmarshal([]byte(jsonLine), &manifest); err != nil {
return nil, fmt.Errorf("invalid JSON manifest: %w", err)
}
return &domain.VerifyResult{
Screenshots: manifest.Screenshots,
Video: manifest.Video,
}, nil
}
// parseSpec extracts typed VerifySpec fields from the generic map[string]any.
func (v *VerifyExecutor) parseSpec(spec map[string]any) (*domain.VerifySpec, error) {
url, _ := spec["url"].(string)
if url == "" {
return nil, domain.ErrVerifyURLRequired
}
vs := &domain.VerifySpec{
URL: url,
}
// Parse optional fields
if viewports, ok := spec["viewports"].([]any); ok {
for _, vp := range viewports {
if s, ok := vp.(string); ok {
vs.Viewports = append(vs.Viewports, s)
}
}
}
if waitFor, ok := spec["wait_for"].(string); ok {
vs.WaitFor = waitFor
}
if waitTimeout, ok := spec["wait_timeout"].(float64); ok {
vs.WaitTimeout = int(waitTimeout)
}
if fullPage, ok := spec["full_page"].(bool); ok {
vs.FullPage = fullPage
}
if video, ok := spec["video"].(bool); ok {
vs.Video = video
}
if evaluate, ok := spec["evaluate"].(bool); ok {
vs.Evaluate = evaluate
}
if prompt, ok := spec["prompt"].(string); ok {
vs.Prompt = prompt
}
if callbackURL, ok := spec["callback_url"].(string); ok {
vs.CallbackURL = callbackURL
}
// Validate the spec
if err := vs.Validate(); err != nil {
return nil, err
}
return vs, nil
}
// publishEvent publishes an event to the SSE stream if a stream publisher is configured.
func (v *VerifyExecutor) publishEvent(streamID, eventType string, data map[string]any) {
if v.streams == nil {
return
}
v.streams.Publish(streamID, port.StreamEvent{
Type: eventType,
Data: data,
})
}
// closeStream closes the stream after a delay to allow clients to receive final events.
func (v *VerifyExecutor) closeStream(ctx context.Context, streamID string) {
if v.streams == nil {
return
}
// Close stream after a short delay to ensure final events are delivered.
go func() {
select {
case <-ctx.Done():
v.streams.Close(streamID)
case <-time.After(streamCloseDelay):
v.streams.Close(streamID)
}
}()
}