rdev/internal/worker/api_client.go
jordan d7a6f37593
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
fix: worker graceful shutdown and RWO PVC compatibility
- Add WaitGroup for graceful shutdown of in-flight tasks
- Change replicas to 1 with Recreate strategy (RWO PVC limitation)
- Optimize Dockerfile: combine RUN commands for smaller layers
- Add compiled binaries to .gitignore

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-06 00:35:00 -07:00

327 lines
9.6 KiB
Go

package worker
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/orchard9/rdev/internal/domain"
)
// APIClient is an HTTP client for standalone workers to communicate with rdev-api.
type APIClient struct {
baseURL string
apiKey string
httpClient *http.Client
}
// APIClientConfig holds configuration for the API client.
type APIClientConfig struct {
// BaseURL is the base URL of the rdev-api server.
BaseURL string
// APIKey is the API key for authentication.
APIKey string
// Timeout is the default request timeout.
Timeout time.Duration
}
// NewAPIClient creates a new API client for standalone workers.
func NewAPIClient(cfg APIClientConfig) *APIClient {
if cfg.Timeout == 0 {
cfg.Timeout = 30 * time.Second
}
return &APIClient{
baseURL: cfg.BaseURL,
apiKey: cfg.APIKey,
httpClient: &http.Client{
Timeout: cfg.Timeout,
},
}
}
// RegisterRequest is the request to register a worker.
type RegisterRequest struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
Version string `json:"version,omitempty"`
Capabilities []string `json:"capabilities,omitempty"`
}
// RegisterResponse is the response from registering a worker.
type RegisterResponse struct {
Success bool `json:"success"`
Data struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
Status string `json:"status"`
Capabilities []string `json:"capabilities,omitempty"`
RegisteredAt string `json:"registered_at"`
LastHeartbeat string `json:"last_heartbeat"`
Version string `json:"version,omitempty"`
} `json:"data"`
Error string `json:"error,omitempty"`
}
// Register registers the worker with rdev-api.
func (c *APIClient) Register(ctx context.Context, req *RegisterRequest) error {
body, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/workers/register", bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("register: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
bodyBytes, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return fmt.Errorf("register returned status %d (failed to read body: %w)", resp.StatusCode, readErr)
}
return fmt.Errorf("register returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// Heartbeat sends a heartbeat to keep the worker alive.
func (c *APIClient) Heartbeat(ctx context.Context, workerID string) error {
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s/workers/%s/heartbeat", c.baseURL, workerID), nil)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("heartbeat: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
bodyBytes, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return fmt.Errorf("heartbeat returned status %d (failed to read body: %w)", resp.StatusCode, readErr)
}
return fmt.Errorf("heartbeat returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// ClaimTaskResponse is the response from claiming a task.
type ClaimTaskResponse struct {
Success bool `json:"success"`
Data struct {
Task *WorkTaskData `json:"task"`
WorkerID string `json:"worker_id"`
} `json:"data"`
Error string `json:"error,omitempty"`
}
// WorkTaskData is the task data returned from the API.
type WorkTaskData struct {
ID string `json:"id"`
ProjectID string `json:"project_id"`
Type string `json:"type"`
Spec map[string]any `json:"spec"`
Status string `json:"status"`
Priority int `json:"priority"`
WorkerID string `json:"worker_id,omitempty"`
CallbackURL string `json:"callback_url,omitempty"`
CreatedAt string `json:"created_at"`
StartedAt string `json:"started_at,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
}
// ToWorkTask converts the API task data to a domain work task.
func (d *WorkTaskData) ToWorkTask() *domain.WorkTask {
if d == nil {
return nil
}
task := &domain.WorkTask{
ID: d.ID,
ProjectID: d.ProjectID,
Type: domain.WorkTaskType(d.Type),
Spec: d.Spec,
Status: domain.WorkTaskStatus(d.Status),
Priority: d.Priority,
WorkerID: d.WorkerID,
CallbackURL: d.CallbackURL,
RetryCount: d.RetryCount,
MaxRetries: d.MaxRetries,
}
if d.CreatedAt != "" {
task.CreatedAt, _ = time.Parse(time.RFC3339, d.CreatedAt)
}
if d.StartedAt != "" {
t, _ := time.Parse(time.RFC3339, d.StartedAt)
task.StartedAt = &t
}
return task
}
// ClaimTask claims the next available task from the queue.
// Returns nil if no tasks are available.
func (c *APIClient) ClaimTask(ctx context.Context, workerID string) (*domain.WorkTask, error) {
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s/workers/%s/claim", c.baseURL, workerID), nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("claim task: %w", err)
}
defer func() { _ = resp.Body.Close() }()
// 204 No Content = no tasks available
if resp.StatusCode == http.StatusNoContent {
return nil, nil
}
if resp.StatusCode != http.StatusOK {
bodyBytes, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return nil, fmt.Errorf("claim task returned status %d (failed to read body: %w)", resp.StatusCode, readErr)
}
return nil, fmt.Errorf("claim task returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result ClaimTaskResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
if result.Data.Task == nil {
return nil, fmt.Errorf("API returned success but no task data")
}
return result.Data.Task.ToWorkTask(), nil
}
// CompleteTaskRequest is the request to complete a task.
type CompleteTaskRequest struct {
Success bool `json:"success"`
Output string `json:"output,omitempty"`
Error string `json:"error,omitempty"`
CommitSHA string `json:"commit_sha,omitempty"`
FilesChanged []string `json:"files_changed,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
}
// CompleteTask marks a task as complete.
func (c *APIClient) CompleteTask(ctx context.Context, workerID, taskID string, result *domain.BuildResult) error {
req := &CompleteTaskRequest{
Success: result.Success,
Output: result.Output,
Error: result.Error,
CommitSHA: result.CommitSHA,
DurationMs: result.DurationMs,
}
if result.FilesChanged != nil {
req.FilesChanged = result.FilesChanged
}
if result.Artifacts != nil {
req.Artifacts = result.Artifacts
}
body, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s/workers/%s/complete/%s", c.baseURL, workerID, taskID), bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("complete task: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
bodyBytes, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return fmt.Errorf("complete task returned status %d (failed to read body: %w)", resp.StatusCode, readErr)
}
return fmt.Errorf("complete task returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// FailTaskRequest is the request to fail a task.
type FailTaskRequest struct {
Error string `json:"error"`
Output string `json:"output,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
}
// FailTask marks a task as failed.
func (c *APIClient) FailTask(ctx context.Context, workerID, taskID string, errMsg, output string, durationMs int64) error {
req := &FailTaskRequest{
Error: errMsg,
Output: output,
DurationMs: durationMs,
}
body, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s/workers/%s/fail/%s", c.baseURL, workerID, taskID), bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("fail task: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
bodyBytes, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return fmt.Errorf("fail task returned status %d (failed to read body: %w)", resp.StatusCode, readErr)
}
return fmt.Errorf("fail task returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// setHeaders sets common headers on the request.
func (c *APIClient) setHeaders(req *http.Request) {
req.Header.Set("Content-Type", "application/json")
if c.apiKey != "" {
req.Header.Set("X-API-Key", c.apiKey)
}
}