Worker template fixes: - Replace panic() with logger.Error() + os.Exit(1) for config errors - Remove double-timeout application (context + middleware) - Add error message truncation to prevent log bloat - Use named constants for shutdown grace period and stale check interval Skeleton pkg/auth fixes: - Fix error wrapping to use %w consistently in jwt.go - Add GetUserOrError() as safe alternative to MustGetUser() panic Skeleton pkg/queue fixes: - Check RowsAffected() errors instead of ignoring them - Add input validation to EnqueueWithOptions (require job type, cap retries) - Add log truncation for error messages - Fix inaccurate doc comment claiming exponential backoff Worker timeout consolidation: - Add internal/worker/timeouts.go with named constants - Migrate all workers to use timeout constants Cleanup: - Remove obsolete slack-preparation-thoughts.md files Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
338 lines
9.1 KiB
Go
338 lines
9.1 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/logging"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// Verify event type constants for SSE streaming.
|
|
const (
|
|
VerifyEventStarted = "verify.started"
|
|
VerifyEventCapturing = "verify.capturing"
|
|
VerifyEventCaptured = "verify.captured"
|
|
VerifyEventCompleted = "verify.completed"
|
|
VerifyEventFailed = "verify.failed"
|
|
)
|
|
|
|
// VerifyExecutor handles WorkTaskTypeVerify tasks.
|
|
// It translates VerifySpec fields from the work task's Spec map,
|
|
// executes the Playwright capture script via kubectl exec,
|
|
// and returns a BuildResult for queue compatibility.
|
|
type VerifyExecutor struct {
|
|
cmdExecutor port.CommandExecutor // kubectl exec wrapper
|
|
streams port.StreamPublisher // SSE stream publisher for real-time events
|
|
namespace string // Kubernetes namespace for the pod
|
|
podName string // Playwright pod name (e.g., "playwright-0")
|
|
}
|
|
|
|
// VerifyExecutorConfig holds configuration for the verify executor.
|
|
type VerifyExecutorConfig struct {
|
|
Namespace string // Kubernetes namespace (e.g., "rdev")
|
|
PodName string // Playwright pod name (e.g., "playwright-0")
|
|
}
|
|
|
|
// NewVerifyExecutor creates a new verify executor.
|
|
func NewVerifyExecutor(
|
|
cmdExecutor port.CommandExecutor,
|
|
streams port.StreamPublisher,
|
|
cfg *VerifyExecutorConfig,
|
|
) *VerifyExecutor {
|
|
if cfg == nil {
|
|
cfg = &VerifyExecutorConfig{
|
|
Namespace: "rdev",
|
|
PodName: "playwright-0",
|
|
}
|
|
}
|
|
if cfg.Namespace == "" {
|
|
cfg.Namespace = "rdev"
|
|
}
|
|
if cfg.PodName == "" {
|
|
cfg.PodName = "playwright-0"
|
|
}
|
|
return &VerifyExecutor{
|
|
cmdExecutor: cmdExecutor,
|
|
streams: streams,
|
|
namespace: cfg.Namespace,
|
|
podName: cfg.PodName,
|
|
}
|
|
}
|
|
|
|
// Execute runs a verify task by capturing screenshots/video of a URL.
|
|
func (v *VerifyExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult {
|
|
log := logging.FromContext(ctx).WithWorker("verify-executor")
|
|
start := time.Now()
|
|
streamID := task.ID // Use task ID as stream ID for SSE
|
|
|
|
// Publish verify.started event
|
|
v.publishEvent(streamID, VerifyEventStarted, map[string]any{
|
|
"task_id": task.ID,
|
|
"project_id": task.ProjectID,
|
|
"started_at": start.Format(time.RFC3339),
|
|
})
|
|
|
|
// Parse VerifySpec from task.Spec
|
|
spec, err := v.parseSpec(task.Spec)
|
|
if err != nil {
|
|
v.publishEvent(streamID, VerifyEventFailed, map[string]any{
|
|
"task_id": task.ID,
|
|
"error": fmt.Sprintf("invalid verify spec: %v", err),
|
|
})
|
|
return &domain.BuildResult{
|
|
Success: false,
|
|
Error: fmt.Sprintf("invalid verify spec: %v", err),
|
|
DurationMs: time.Since(start).Milliseconds(),
|
|
}
|
|
}
|
|
|
|
// Apply defaults
|
|
spec = spec.WithDefaults()
|
|
|
|
// Build output directory using task ID for isolation
|
|
outputDir := fmt.Sprintf("/captures/%s", task.ID)
|
|
|
|
// Publish capturing event
|
|
v.publishEvent(streamID, VerifyEventCapturing, map[string]any{
|
|
"task_id": task.ID,
|
|
"url": spec.URL,
|
|
"viewports": spec.Viewports,
|
|
})
|
|
|
|
log.Info("executing verify capture",
|
|
"task_id", task.ID,
|
|
"project_id", task.ProjectID,
|
|
"url", spec.URL,
|
|
"viewports", spec.Viewports,
|
|
"pod", v.podName,
|
|
)
|
|
|
|
// Build capture command
|
|
cmdArgs := v.buildCaptureCommand(spec, outputDir)
|
|
|
|
// Execute via CommandExecutor
|
|
captureOutput, err := v.executeCapture(ctx, task.ID, cmdArgs)
|
|
if err != nil {
|
|
v.publishEvent(streamID, VerifyEventFailed, map[string]any{
|
|
"task_id": task.ID,
|
|
"error": fmt.Sprintf("capture execution failed: %v", err),
|
|
"duration_ms": time.Since(start).Milliseconds(),
|
|
})
|
|
v.closeStream(ctx, streamID)
|
|
return &domain.BuildResult{
|
|
Success: false,
|
|
Error: fmt.Sprintf("capture execution failed: %v", err),
|
|
DurationMs: time.Since(start).Milliseconds(),
|
|
}
|
|
}
|
|
|
|
// Parse JSON manifest from stdout
|
|
verifyResult, err := v.parseManifest(captureOutput)
|
|
if err != nil {
|
|
v.publishEvent(streamID, VerifyEventFailed, map[string]any{
|
|
"task_id": task.ID,
|
|
"error": fmt.Sprintf("failed to parse capture manifest: %v", err),
|
|
"duration_ms": time.Since(start).Milliseconds(),
|
|
})
|
|
v.closeStream(ctx, streamID)
|
|
return &domain.BuildResult{
|
|
Success: false,
|
|
Error: fmt.Sprintf("failed to parse capture manifest: %v", err),
|
|
DurationMs: time.Since(start).Milliseconds(),
|
|
}
|
|
}
|
|
|
|
verifyResult.Success = true
|
|
verifyResult.DurationMs = time.Since(start).Milliseconds()
|
|
|
|
// Publish captured event
|
|
v.publishEvent(streamID, VerifyEventCaptured, map[string]any{
|
|
"task_id": task.ID,
|
|
"screenshots": verifyResult.Screenshots,
|
|
"video": verifyResult.Video,
|
|
})
|
|
|
|
// Publish completion event
|
|
v.publishEvent(streamID, VerifyEventCompleted, map[string]any{
|
|
"task_id": task.ID,
|
|
"success": true,
|
|
"screenshots": verifyResult.Screenshots,
|
|
"video": verifyResult.Video,
|
|
"duration_ms": verifyResult.DurationMs,
|
|
})
|
|
v.closeStream(ctx, streamID)
|
|
|
|
// Convert to BuildResult for queue compatibility
|
|
return verifyResult.ToBuildResult()
|
|
}
|
|
|
|
// buildCaptureCommand constructs the node command arguments for capture.js.
|
|
func (v *VerifyExecutor) buildCaptureCommand(spec *domain.VerifySpec, outputDir string) []string {
|
|
args := []string{
|
|
"node", "/scripts/capture.js",
|
|
"--url=" + spec.URL,
|
|
"--output=" + outputDir,
|
|
"--viewports=" + strings.Join(spec.Viewports, ","),
|
|
}
|
|
|
|
if spec.WaitFor != "" {
|
|
args = append(args, "--wait-for="+spec.WaitFor)
|
|
}
|
|
|
|
if spec.FullPage {
|
|
args = append(args, "--full-page=true")
|
|
}
|
|
|
|
if spec.Video {
|
|
args = append(args, "--video=true")
|
|
}
|
|
|
|
return args
|
|
}
|
|
|
|
// executeCapture runs the capture command in the Playwright pod.
|
|
func (v *VerifyExecutor) executeCapture(ctx context.Context, taskID string, cmdArgs []string) (string, error) {
|
|
// Create a command to execute
|
|
cmd := &domain.Command{
|
|
ID: domain.CommandID(fmt.Sprintf("verify-%s", taskID)),
|
|
ProjectID: "", // Verify tasks aren't project-specific
|
|
Type: domain.CommandTypeShell,
|
|
Args: cmdArgs,
|
|
StartedAt: time.Now(),
|
|
}
|
|
|
|
var outputBuilder strings.Builder
|
|
|
|
// Execute the command and capture output
|
|
result, err := v.cmdExecutor.Execute(ctx, cmd, v.podName, func(line domain.OutputLine) {
|
|
// Capture all output for parsing
|
|
if line.Stream == "stdout" {
|
|
outputBuilder.WriteString(line.Line)
|
|
outputBuilder.WriteString("\n")
|
|
}
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("kubectl exec failed: %w", err)
|
|
}
|
|
|
|
if !result.Success() {
|
|
return "", fmt.Errorf("capture script failed with exit code %d", result.ExitCode)
|
|
}
|
|
|
|
return outputBuilder.String(), nil
|
|
}
|
|
|
|
// parseManifest parses the JSON capture manifest from the script output.
|
|
func (v *VerifyExecutor) parseManifest(output string) (*domain.VerifyResult, error) {
|
|
// The capture script outputs JSON to stdout
|
|
// Find the JSON line (last non-empty line)
|
|
lines := strings.Split(strings.TrimSpace(output), "\n")
|
|
if len(lines) == 0 {
|
|
return nil, fmt.Errorf("no output from capture script")
|
|
}
|
|
|
|
jsonLine := lines[len(lines)-1]
|
|
|
|
var manifest struct {
|
|
Screenshots map[string]string `json:"screenshots"`
|
|
Video string `json:"video,omitempty"`
|
|
}
|
|
|
|
if err := json.Unmarshal([]byte(jsonLine), &manifest); err != nil {
|
|
return nil, fmt.Errorf("invalid JSON manifest: %w", err)
|
|
}
|
|
|
|
return &domain.VerifyResult{
|
|
Screenshots: manifest.Screenshots,
|
|
Video: manifest.Video,
|
|
}, nil
|
|
}
|
|
|
|
// parseSpec extracts typed VerifySpec fields from the generic map[string]any.
|
|
func (v *VerifyExecutor) parseSpec(spec map[string]any) (*domain.VerifySpec, error) {
|
|
url, _ := spec["url"].(string)
|
|
if url == "" {
|
|
return nil, domain.ErrVerifyURLRequired
|
|
}
|
|
|
|
vs := &domain.VerifySpec{
|
|
URL: url,
|
|
}
|
|
|
|
// Parse optional fields
|
|
if viewports, ok := spec["viewports"].([]any); ok {
|
|
for _, vp := range viewports {
|
|
if s, ok := vp.(string); ok {
|
|
vs.Viewports = append(vs.Viewports, s)
|
|
}
|
|
}
|
|
}
|
|
|
|
if waitFor, ok := spec["wait_for"].(string); ok {
|
|
vs.WaitFor = waitFor
|
|
}
|
|
|
|
if waitTimeout, ok := spec["wait_timeout"].(float64); ok {
|
|
vs.WaitTimeout = int(waitTimeout)
|
|
}
|
|
|
|
if fullPage, ok := spec["full_page"].(bool); ok {
|
|
vs.FullPage = fullPage
|
|
}
|
|
|
|
if video, ok := spec["video"].(bool); ok {
|
|
vs.Video = video
|
|
}
|
|
|
|
if evaluate, ok := spec["evaluate"].(bool); ok {
|
|
vs.Evaluate = evaluate
|
|
}
|
|
|
|
if prompt, ok := spec["prompt"].(string); ok {
|
|
vs.Prompt = prompt
|
|
}
|
|
|
|
if callbackURL, ok := spec["callback_url"].(string); ok {
|
|
vs.CallbackURL = callbackURL
|
|
}
|
|
|
|
// Validate the spec
|
|
if err := vs.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return vs, nil
|
|
}
|
|
|
|
// publishEvent publishes an event to the SSE stream if a stream publisher is configured.
|
|
func (v *VerifyExecutor) publishEvent(streamID, eventType string, data map[string]any) {
|
|
if v.streams == nil {
|
|
return
|
|
}
|
|
v.streams.Publish(streamID, port.StreamEvent{
|
|
Type: eventType,
|
|
Data: data,
|
|
})
|
|
}
|
|
|
|
// closeStream closes the stream after a delay to allow clients to receive final events.
|
|
func (v *VerifyExecutor) closeStream(ctx context.Context, streamID string) {
|
|
if v.streams == nil {
|
|
return
|
|
}
|
|
// Close stream after a short delay to ensure final events are delivered.
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
v.streams.Close(streamID)
|
|
case <-time.After(streamCloseDelay):
|
|
v.streams.Close(streamID)
|
|
}
|
|
}()
|
|
}
|