From d7a6f3759344c09bf3a733abbac7ab8a0a8e9036 Mon Sep 17 00:00:00 2001 From: jordan Date: Fri, 6 Feb 2026 00:35:00 -0700 Subject: [PATCH] fix: worker graceful shutdown and RWO PVC compatibility - Add WaitGroup for graceful shutdown of in-flight tasks - Change replicas to 1 with Recreate strategy (RWO PVC limitation) - Optimize Dockerfile: combine RUN commands for smaller layers - Add compiled binaries to .gitignore Co-Authored-By: Claude Opus 4.5 --- .gitignore | 6 ++++++ Dockerfile | 4 ++-- cmd/rdev-worker/main.go | 30 +++++++++++++++++++++------ deployments/k8s/base/rdev-worker.yaml | 6 +++++- internal/adapter/claudebox/client.go | 30 +++++++++++++++++++++------ internal/worker/api_client.go | 28 ++++++++++++++++++++----- 6 files changed, 84 insertions(+), 20 deletions(-) diff --git a/.gitignore b/.gitignore index eff5848..5472f1f 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,9 @@ tmp/ *-deploy-key.pub *-deploy-key.b64 .agentive-remediation/ + +# Compiled binaries +/rdev-worker +/rdev-api +/claudebox-sidecar +/sdlc diff --git a/Dockerfile b/Dockerfile index 2fab4bc..fd3ea36 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,8 +7,8 @@ WORKDIR /build COPY go.mod go.sum ./ RUN go mod download COPY . . -RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o sdlc ./cmd/sdlc -RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o claudebox-sidecar ./cmd/claudebox-sidecar +RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o sdlc ./cmd/sdlc && \ + CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o claudebox-sidecar ./cmd/claudebox-sidecar # Runtime stage FROM ubuntu:22.04 diff --git a/cmd/rdev-worker/main.go b/cmd/rdev-worker/main.go index 821a98f..8ec6c29 100644 --- a/cmd/rdev-worker/main.go +++ b/cmd/rdev-worker/main.go @@ -8,6 +8,7 @@ import ( "os" "os/signal" "strings" + "sync" "syscall" "time" @@ -83,11 +84,14 @@ func main() { WorkDir: "/workspace", }) + // WaitGroup to track in-flight tasks for graceful shutdown + var wg sync.WaitGroup + // Start heartbeat loop go runHeartbeat(ctx, apiClient, cfg.WorkerID, cfg.HeartbeatInterval, log) // Start work loop - go runWorkLoop(ctx, apiClient, buildExecutor, sdlcExecutor, cfg, log) + go runWorkLoop(ctx, apiClient, buildExecutor, sdlcExecutor, cfg, log, &wg) // Wait for shutdown signal quit := make(chan os.Signal, 1) @@ -97,9 +101,18 @@ func main() { log.Info("shutting down worker") cancel() - // Give ongoing work a chance to complete - time.Sleep(5 * time.Second) - log.Info("worker stopped") + // Wait for in-flight tasks to complete with timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + log.Info("all tasks completed, worker stopped") + case <-time.After(cfg.TaskTimeout): + log.Warn("shutdown timeout, some tasks may be incomplete") + } } // Config holds worker configuration. @@ -180,6 +193,7 @@ func runWorkLoop( sdlcExecutor *worker.HTTPSDLCTaskExecutor, cfg *Config, log *logging.Logger, + wg *sync.WaitGroup, ) { ticker := time.NewTicker(cfg.PollInterval) defer ticker.Stop() @@ -206,8 +220,12 @@ func runWorkLoop( "type", task.Type, ) - // Execute the task - executeTask(ctx, client, buildExecutor, sdlcExecutor, task, cfg, log) + // Execute the task with WaitGroup tracking + wg.Add(1) + go func(t *domain.WorkTask) { + defer wg.Done() + executeTask(ctx, client, buildExecutor, sdlcExecutor, t, cfg, log) + }(task) } } } diff --git a/deployments/k8s/base/rdev-worker.yaml b/deployments/k8s/base/rdev-worker.yaml index cffeab9..5448477 100644 --- a/deployments/k8s/base/rdev-worker.yaml +++ b/deployments/k8s/base/rdev-worker.yaml @@ -9,7 +9,11 @@ metadata: app.kubernetes.io/name: rdev-worker app.kubernetes.io/part-of: rdev spec: - replicas: 2 + replicas: 1 + # Recreate strategy required: claudebox-claude-config PVC is RWO (ReadWriteOnce) + # and cannot be attached to multiple pods simultaneously + strategy: + type: Recreate selector: matchLabels: app: rdev-worker diff --git a/internal/adapter/claudebox/client.go b/internal/adapter/claudebox/client.go index 2b645f0..b2bd005 100644 --- a/internal/adapter/claudebox/client.go +++ b/internal/adapter/claudebox/client.go @@ -117,7 +117,10 @@ func (c *Client) Execute(ctx context.Context, req *ExecuteRequest) (*ExecuteResp defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return nil, fmt.Errorf("execute returned status %d (failed to read body: %w)", resp.StatusCode, readErr) + } return nil, fmt.Errorf("execute returned status %d: %s", resp.StatusCode, string(bodyBytes)) } @@ -163,7 +166,10 @@ func (c *Client) ExecuteStream(ctx context.Context, req *ExecuteRequest, handler defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return fmt.Errorf("execute stream returned status %d (failed to read body: %w)", resp.StatusCode, readErr) + } return fmt.Errorf("execute stream returned status %d: %s", resp.StatusCode, string(bodyBytes)) } @@ -233,7 +239,10 @@ func (c *Client) GitClone(ctx context.Context, cloneURL, workDir string) (*GitCl defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return nil, fmt.Errorf("git clone returned status %d (failed to read body: %w)", resp.StatusCode, readErr) + } return nil, fmt.Errorf("git clone returned status %d: %s", resp.StatusCode, string(bodyBytes)) } @@ -288,7 +297,10 @@ func (c *Client) GitCommitAndPush(ctx context.Context, message string, push bool defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return nil, fmt.Errorf("git commit returned status %d (failed to read body: %w)", resp.StatusCode, readErr) + } return nil, fmt.Errorf("git commit returned status %d: %s", resp.StatusCode, string(bodyBytes)) } @@ -328,7 +340,10 @@ func (c *Client) GitStatus(ctx context.Context, workDir string) (*GitStatusRespo defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return nil, fmt.Errorf("git status returned status %d (failed to read body: %w)", resp.StatusCode, readErr) + } return nil, fmt.Errorf("git status returned status %d: %s", resp.StatusCode, string(bodyBytes)) } @@ -381,7 +396,10 @@ func (c *Client) RunSDLC(ctx context.Context, command string, args []string, wor defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return nil, fmt.Errorf("sdlc returned status %d (failed to read body: %w)", resp.StatusCode, readErr) + } return nil, fmt.Errorf("sdlc returned status %d: %s", resp.StatusCode, string(bodyBytes)) } diff --git a/internal/worker/api_client.go b/internal/worker/api_client.go index e56bc2d..821aa47 100644 --- a/internal/worker/api_client.go +++ b/internal/worker/api_client.go @@ -88,7 +88,10 @@ func (c *APIClient) Register(ctx context.Context, req *RegisterRequest) error { defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - bodyBytes, _ := io.ReadAll(resp.Body) + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return fmt.Errorf("register returned status %d (failed to read body: %w)", resp.StatusCode, readErr) + } return fmt.Errorf("register returned status %d: %s", resp.StatusCode, string(bodyBytes)) } @@ -111,7 +114,10 @@ func (c *APIClient) Heartbeat(ctx context.Context, workerID string) error { defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return fmt.Errorf("heartbeat returned status %d (failed to read body: %w)", resp.StatusCode, readErr) + } return fmt.Errorf("heartbeat returned status %d: %s", resp.StatusCode, string(bodyBytes)) } @@ -193,7 +199,10 @@ func (c *APIClient) ClaimTask(ctx context.Context, workerID string) (*domain.Wor } if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return nil, fmt.Errorf("claim task returned status %d (failed to read body: %w)", resp.StatusCode, readErr) + } return nil, fmt.Errorf("claim task returned status %d: %s", resp.StatusCode, string(bodyBytes)) } @@ -202,6 +211,9 @@ func (c *APIClient) ClaimTask(ctx context.Context, workerID string) (*domain.Wor return nil, fmt.Errorf("decode response: %w", err) } + if result.Data.Task == nil { + return nil, fmt.Errorf("API returned success but no task data") + } return result.Data.Task.ToWorkTask(), nil } @@ -251,7 +263,10 @@ func (c *APIClient) CompleteTask(ctx context.Context, workerID, taskID string, r defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return fmt.Errorf("complete task returned status %d (failed to read body: %w)", resp.StatusCode, readErr) + } return fmt.Errorf("complete task returned status %d: %s", resp.StatusCode, string(bodyBytes)) } @@ -292,7 +307,10 @@ func (c *APIClient) FailTask(ctx context.Context, workerID, taskID string, errMs defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return fmt.Errorf("fail task returned status %d (failed to read body: %w)", resp.StatusCode, readErr) + } return fmt.Errorf("fail task returned status %d: %s", resp.StatusCode, string(bodyBytes)) }