rdev/internal/adapter/sdlc/worker_executor.go
jordan 84af398d85
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
refactor: add timeout constants for agent execution tiers
Add TimeoutAgentExecution (22m) to handlers for synchronous SDLC
execution, and TimeoutAgent{Default,Medium,Heavy} (12/22/47m) to
workers for tiered agent task execution. Aligns with SDLC action
complexity tiers and prevents inline duration literals.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-11 10:48:24 -07:00

453 lines
15 KiB
Go

// Package sdlc provides SDLC-related adapters.
package sdlc
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
"github.com/orchard9/rdev/internal/sdlc"
)
// WorkerSDLCExecutor implements port.SDLCExecutor by routing commands through
// the worker pool. Used for skeleton/monorepo projects that don't have a
// dedicated pod.
type WorkerSDLCExecutor struct {
workQueue port.WorkQueue
db *sql.DB
timeout time.Duration
logger *slog.Logger
}
// WorkerSDLCExecutorConfig configures the worker SDLC executor.
type WorkerSDLCExecutorConfig struct {
// WorkQueue for enqueueing SDLC tasks.
WorkQueue port.WorkQueue
// DB for fetching project git clone URLs.
DB *sql.DB
// Timeout is the maximum wait time for task completion (default: 10 minutes).
Timeout time.Duration
Logger *slog.Logger
}
// NewWorkerSDLCExecutor creates a new worker-based SDLC executor.
func NewWorkerSDLCExecutor(cfg WorkerSDLCExecutorConfig) *WorkerSDLCExecutor {
timeout := cfg.Timeout
if timeout == 0 {
timeout = 10 * time.Minute
}
logger := cfg.Logger
if logger == nil {
logger = slog.Default()
}
return &WorkerSDLCExecutor{
workQueue: cfg.WorkQueue,
db: cfg.DB,
timeout: timeout,
logger: logger.With("component", "worker-sdlc-executor"),
}
}
// getGitCloneURL fetches the HTTP clone URL for a project from the database.
func (e *WorkerSDLCExecutor) getGitCloneURL(ctx context.Context, projectID string) (string, error) {
if e.db == nil {
return "", fmt.Errorf("database not configured for worker SDLC executor")
}
var gitCloneHTTP sql.NullString
err := e.db.QueryRowContext(ctx,
`SELECT git_clone_http FROM projects WHERE id = $1`,
projectID,
).Scan(&gitCloneHTTP)
if err != nil {
if err == sql.ErrNoRows {
return "", domain.ErrProjectNotFound
}
return "", fmt.Errorf("failed to get project git URL: %w", err)
}
if !gitCloneHTTP.Valid || gitCloneHTTP.String == "" {
return "", fmt.Errorf("project %s has no git clone URL configured", projectID)
}
return gitCloneHTTP.String, nil
}
// enqueueAndWait enqueues an SDLC task and waits for completion.
func (e *WorkerSDLCExecutor) enqueueAndWait(ctx context.Context, projectID string, spec domain.SDLCTaskSpec) (string, error) {
specMap := map[string]any{
"command": spec.Command,
"args": spec.Args,
"git_clone_url": spec.GitCloneURL,
"auto_commit": spec.AutoCommit,
"auto_push": spec.AutoPush,
}
task := &domain.WorkTask{
ProjectID: projectID,
Type: domain.WorkTaskTypeSDLC,
Spec: specMap,
MaxRetries: 1,
}
taskID, err := e.workQueue.Enqueue(ctx, task)
if err != nil {
return "", fmt.Errorf("failed to enqueue SDLC task: %w", err)
}
e.logger.Info("enqueued SDLC task",
"task_id", taskID,
"project_id", projectID,
"command", spec.Command,
)
return e.waitForCompletion(ctx, taskID)
}
// waitForCompletion polls the work queue until the task completes or times out.
func (e *WorkerSDLCExecutor) waitForCompletion(ctx context.Context, taskID string) (string, error) {
deadline := time.Now().Add(e.timeout)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-ticker.C:
if time.Now().After(deadline) {
return "", fmt.Errorf("timeout waiting for SDLC task %s", taskID)
}
task, err := e.workQueue.GetTask(ctx, taskID)
if err != nil {
continue // Task might not exist yet
}
switch task.Status {
case domain.WorkTaskStatusCompleted:
if task.Result != nil {
return task.Result.Output, nil
}
return "", nil
case domain.WorkTaskStatusFailed:
return "", fmt.Errorf("SDLC task failed: %s", task.Error)
case domain.WorkTaskStatusCancelled:
return "", fmt.Errorf("SDLC task was cancelled")
}
// Still pending or running, continue polling
}
}
}
// executeCommand is a helper that builds and executes an SDLC command.
func (e *WorkerSDLCExecutor) executeCommand(ctx context.Context, projectID, command string, args ...string) (string, error) {
gitURL, err := e.getGitCloneURL(ctx, projectID)
if err != nil {
return "", err
}
spec := domain.SDLCTaskSpec{
Command: command,
Args: args,
GitCloneURL: gitURL,
AutoCommit: true,
AutoPush: true,
}
return e.enqueueAndWait(ctx, projectID, spec)
}
// executeReadOnlyCommand executes an SDLC command that doesn't modify state.
func (e *WorkerSDLCExecutor) executeReadOnlyCommand(ctx context.Context, projectID, command string, args ...string) (string, error) {
gitURL, err := e.getGitCloneURL(ctx, projectID)
if err != nil {
return "", err
}
spec := domain.SDLCTaskSpec{
Command: command,
Args: args,
GitCloneURL: gitURL,
AutoCommit: false,
AutoPush: false,
}
return e.enqueueAndWait(ctx, projectID, spec)
}
// GetState returns the global SDLC state for a project.
func (e *WorkerSDLCExecutor) GetState(ctx context.Context, projectID string) (*sdlc.State, error) {
output, err := e.executeReadOnlyCommand(ctx, projectID, "state")
if err != nil {
return nil, err
}
var state sdlc.State
if err := json.Unmarshal([]byte(output), &state); err != nil {
return nil, fmt.Errorf("parse sdlc state: %w", err)
}
return &state, nil
}
// GetNext returns the classifier's recommendation for the next action.
func (e *WorkerSDLCExecutor) GetNext(ctx context.Context, projectID, feature string) (*sdlc.Classification, error) {
args := []string{}
if feature != "" {
args = append(args, "--feature", feature)
}
output, err := e.executeReadOnlyCommand(ctx, projectID, "next", args...)
if err != nil {
return nil, err
}
var cl sdlc.Classification
if err := json.Unmarshal([]byte(output), &cl); err != nil {
return nil, fmt.Errorf("parse sdlc classification: %w", err)
}
return &cl, nil
}
// ListFeatures returns all features in the project.
func (e *WorkerSDLCExecutor) ListFeatures(ctx context.Context, projectID string) ([]*sdlc.Feature, error) {
output, err := e.executeReadOnlyCommand(ctx, projectID, "feature", "list")
if err != nil {
return nil, err
}
var features []*sdlc.Feature
if err := json.Unmarshal([]byte(output), &features); err != nil {
return nil, fmt.Errorf("parse sdlc features: %w", err)
}
return features, nil
}
// GetFeature returns a single feature by slug.
func (e *WorkerSDLCExecutor) GetFeature(ctx context.Context, projectID, slug string) (*sdlc.Feature, error) {
output, err := e.executeReadOnlyCommand(ctx, projectID, "feature", "show", slug)
if err != nil {
return nil, err
}
var f sdlc.Feature
if err := json.Unmarshal([]byte(output), &f); err != nil {
return nil, fmt.Errorf("parse sdlc feature: %w", err)
}
return &f, nil
}
// CreateFeature creates a new feature with the given slug and title.
func (e *WorkerSDLCExecutor) CreateFeature(ctx context.Context, projectID, slug, title string) (*sdlc.Feature, error) {
output, err := e.executeCommand(ctx, projectID, "feature", "create", slug, "--title", title)
if err != nil {
return nil, err
}
var f sdlc.Feature
if err := json.Unmarshal([]byte(output), &f); err != nil {
return nil, fmt.Errorf("parse sdlc feature: %w", err)
}
return &f, nil
}
// TransitionFeature moves a feature to the specified phase.
func (e *WorkerSDLCExecutor) TransitionFeature(ctx context.Context, projectID, slug string, phase sdlc.FeaturePhase) error {
_, err := e.executeCommand(ctx, projectID, "feature", "transition", slug, string(phase))
return err
}
// BlockFeature adds a blocker reason to a feature.
func (e *WorkerSDLCExecutor) BlockFeature(ctx context.Context, projectID, slug, reason string) error {
_, err := e.executeCommand(ctx, projectID, "feature", "block", slug, "--reason", reason)
return err
}
// UnblockFeature removes all blockers from a feature.
func (e *WorkerSDLCExecutor) UnblockFeature(ctx context.Context, projectID, slug string) error {
_, err := e.executeCommand(ctx, projectID, "feature", "unblock", slug)
return err
}
// DeleteFeature removes a feature entirely.
func (e *WorkerSDLCExecutor) DeleteFeature(ctx context.Context, projectID, slug string) error {
_, err := e.executeCommand(ctx, projectID, "feature", "delete", slug, "--force")
return err
}
// GetArtifactStatus returns artifact statuses for a feature.
func (e *WorkerSDLCExecutor) GetArtifactStatus(ctx context.Context, projectID, slug string) (map[sdlc.ArtifactType]*sdlc.Artifact, error) {
output, err := e.executeReadOnlyCommand(ctx, projectID, "artifact", "status", slug)
if err != nil {
return nil, err
}
var artifacts map[sdlc.ArtifactType]*sdlc.Artifact
if err := json.Unmarshal([]byte(output), &artifacts); err != nil {
return nil, fmt.Errorf("parse sdlc artifacts: %w", err)
}
return artifacts, nil
}
// ApproveArtifact approves a feature artifact.
func (e *WorkerSDLCExecutor) ApproveArtifact(ctx context.Context, projectID, slug string, artType sdlc.ArtifactType) error {
_, err := e.executeCommand(ctx, projectID, "artifact", "approve", slug, string(artType))
return err
}
// RejectArtifact rejects a feature artifact.
func (e *WorkerSDLCExecutor) RejectArtifact(ctx context.Context, projectID, slug string, artType sdlc.ArtifactType) error {
_, err := e.executeCommand(ctx, projectID, "artifact", "reject", slug, string(artType))
return err
}
// PassArtifact marks a feature artifact as passed.
func (e *WorkerSDLCExecutor) PassArtifact(ctx context.Context, projectID, slug string, artType sdlc.ArtifactType) error {
_, err := e.executeCommand(ctx, projectID, "artifact", "pass", slug, string(artType))
return err
}
// FailArtifact marks a feature artifact as failed.
func (e *WorkerSDLCExecutor) FailArtifact(ctx context.Context, projectID, slug string, artType sdlc.ArtifactType) error {
_, err := e.executeCommand(ctx, projectID, "artifact", "fail", slug, string(artType))
return err
}
// NeedsFixArtifact marks a feature artifact as needing fixes.
func (e *WorkerSDLCExecutor) NeedsFixArtifact(ctx context.Context, projectID, slug string, artType sdlc.ArtifactType) error {
_, err := e.executeCommand(ctx, projectID, "artifact", "needs-fix", slug, string(artType))
return err
}
// ListTasks returns all tasks for a feature.
func (e *WorkerSDLCExecutor) ListTasks(ctx context.Context, projectID, slug string) ([]sdlc.Task, error) {
output, err := e.executeReadOnlyCommand(ctx, projectID, "task", "list", slug)
if err != nil {
return nil, err
}
var tasks []sdlc.Task
if err := json.Unmarshal([]byte(output), &tasks); err != nil {
return nil, fmt.Errorf("parse sdlc tasks: %w", err)
}
return tasks, nil
}
// AddTask adds a new task to a feature.
func (e *WorkerSDLCExecutor) AddTask(ctx context.Context, projectID, slug, title string) (*sdlc.Task, error) {
output, err := e.executeCommand(ctx, projectID, "task", "add", slug, "--title", title)
if err != nil {
return nil, err
}
var t sdlc.Task
if err := json.Unmarshal([]byte(output), &t); err != nil {
return nil, fmt.Errorf("parse sdlc task: %w", err)
}
return &t, nil
}
// StartTask marks a task as in-progress.
func (e *WorkerSDLCExecutor) StartTask(ctx context.Context, projectID, slug, taskID string) error {
_, err := e.executeCommand(ctx, projectID, "task", "start", slug, taskID)
return err
}
// CompleteTask marks a task as complete.
func (e *WorkerSDLCExecutor) CompleteTask(ctx context.Context, projectID, slug, taskID string) error {
_, err := e.executeCommand(ctx, projectID, "task", "complete", slug, taskID)
return err
}
// BlockTask marks a task as blocked.
func (e *WorkerSDLCExecutor) BlockTask(ctx context.Context, projectID, slug, taskID string) error {
_, err := e.executeCommand(ctx, projectID, "task", "block", slug, taskID)
return err
}
// QueryBlocked returns all blocked features.
func (e *WorkerSDLCExecutor) QueryBlocked(ctx context.Context, projectID string) ([]port.BlockedInfo, error) {
output, err := e.executeReadOnlyCommand(ctx, projectID, "query", "blocked")
if err != nil {
return nil, err
}
var blocked []port.BlockedInfo
if err := json.Unmarshal([]byte(output), &blocked); err != nil {
return nil, fmt.Errorf("parse sdlc blocked query: %w", err)
}
return blocked, nil
}
// QueryReady returns features ready for work.
func (e *WorkerSDLCExecutor) QueryReady(ctx context.Context, projectID string) ([]port.ReadyInfo, error) {
output, err := e.executeReadOnlyCommand(ctx, projectID, "query", "ready")
if err != nil {
return nil, err
}
var ready []port.ReadyInfo
if err := json.Unmarshal([]byte(output), &ready); err != nil {
return nil, fmt.Errorf("parse sdlc ready query: %w", err)
}
return ready, nil
}
// QueryNeedsApproval returns features awaiting approval.
func (e *WorkerSDLCExecutor) QueryNeedsApproval(ctx context.Context, projectID string) ([]port.ApprovalInfo, error) {
output, err := e.executeReadOnlyCommand(ctx, projectID, "query", "needs-approval")
if err != nil {
return nil, err
}
var pending []port.ApprovalInfo
if err := json.Unmarshal([]byte(output), &pending); err != nil {
return nil, fmt.Errorf("parse sdlc approval query: %w", err)
}
return pending, nil
}
// CreateBranch creates a feature branch and its manifest.
func (e *WorkerSDLCExecutor) CreateBranch(ctx context.Context, projectID, slug string) (*sdlc.BranchManifest, error) {
output, err := e.executeCommand(ctx, projectID, "branch", "create", slug)
if err != nil {
return nil, err
}
var manifest sdlc.BranchManifest
if err := json.Unmarshal([]byte(output), &manifest); err != nil {
return nil, fmt.Errorf("parse sdlc branch manifest: %w", err)
}
return &manifest, nil
}
// GetBranchStatus returns the full branch status including checklist.
func (e *WorkerSDLCExecutor) GetBranchStatus(ctx context.Context, projectID, slug string) (*port.BranchStatus, error) {
output, err := e.executeReadOnlyCommand(ctx, projectID, "branch", "status", slug)
if err != nil {
return nil, err
}
var result port.BranchStatus
if err := json.Unmarshal([]byte(output), &result); err != nil {
return nil, fmt.Errorf("parse sdlc branch status: %w", err)
}
return &result, nil
}
// SyncBranch syncs a feature branch with its base branch.
func (e *WorkerSDLCExecutor) SyncBranch(ctx context.Context, projectID, slug string) error {
_, err := e.executeCommand(ctx, projectID, "branch", "sync", slug)
return err
}
// MergeFeature merges a feature branch after all gates pass.
func (e *WorkerSDLCExecutor) MergeFeature(ctx context.Context, projectID, slug, strategy string) error {
args := []string{slug}
if strategy != "" {
args = append(args, "--strategy", strategy)
}
_, err := e.executeCommand(ctx, projectID, "merge", args...)
return err
}
// ArchiveFeature archives a released feature.
func (e *WorkerSDLCExecutor) ArchiveFeature(ctx context.Context, projectID, slug string) error {
_, err := e.executeCommand(ctx, projectID, "archive", slug)
return err
}
// Compile-time interface check.
var _ port.SDLCExecutor = (*WorkerSDLCExecutor)(nil)