Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
- Add WaitGroup for graceful shutdown of in-flight tasks - Change replicas to 1 with Recreate strategy (RWO PVC limitation) - Optimize Dockerfile: combine RUN commands for smaller layers - Add compiled binaries to .gitignore Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
327 lines
9.6 KiB
Go
327 lines
9.6 KiB
Go
package worker
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
)
|
|
|
|
// APIClient is an HTTP client for standalone workers to communicate with rdev-api.
|
|
type APIClient struct {
|
|
baseURL string
|
|
apiKey string
|
|
httpClient *http.Client
|
|
}
|
|
|
|
// APIClientConfig holds configuration for the API client.
|
|
type APIClientConfig struct {
|
|
// BaseURL is the base URL of the rdev-api server.
|
|
BaseURL string
|
|
|
|
// APIKey is the API key for authentication.
|
|
APIKey string
|
|
|
|
// Timeout is the default request timeout.
|
|
Timeout time.Duration
|
|
}
|
|
|
|
// NewAPIClient creates a new API client for standalone workers.
|
|
func NewAPIClient(cfg APIClientConfig) *APIClient {
|
|
if cfg.Timeout == 0 {
|
|
cfg.Timeout = 30 * time.Second
|
|
}
|
|
return &APIClient{
|
|
baseURL: cfg.BaseURL,
|
|
apiKey: cfg.APIKey,
|
|
httpClient: &http.Client{
|
|
Timeout: cfg.Timeout,
|
|
},
|
|
}
|
|
}
|
|
|
|
// RegisterRequest is the request to register a worker.
|
|
type RegisterRequest struct {
|
|
ID string `json:"id"`
|
|
Hostname string `json:"hostname"`
|
|
Version string `json:"version,omitempty"`
|
|
Capabilities []string `json:"capabilities,omitempty"`
|
|
}
|
|
|
|
// RegisterResponse is the response from registering a worker.
|
|
type RegisterResponse struct {
|
|
Success bool `json:"success"`
|
|
Data struct {
|
|
ID string `json:"id"`
|
|
Hostname string `json:"hostname"`
|
|
Status string `json:"status"`
|
|
Capabilities []string `json:"capabilities,omitempty"`
|
|
RegisteredAt string `json:"registered_at"`
|
|
LastHeartbeat string `json:"last_heartbeat"`
|
|
Version string `json:"version,omitempty"`
|
|
} `json:"data"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// Register registers the worker with rdev-api.
|
|
func (c *APIClient) Register(ctx context.Context, req *RegisterRequest) error {
|
|
body, err := json.Marshal(req)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal request: %w", err)
|
|
}
|
|
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/workers/register", bytes.NewReader(body))
|
|
if err != nil {
|
|
return fmt.Errorf("create request: %w", err)
|
|
}
|
|
c.setHeaders(httpReq)
|
|
|
|
resp, err := c.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return fmt.Errorf("register: %w", err)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
|
bodyBytes, readErr := io.ReadAll(resp.Body)
|
|
if readErr != nil {
|
|
return fmt.Errorf("register returned status %d (failed to read body: %w)", resp.StatusCode, readErr)
|
|
}
|
|
return fmt.Errorf("register returned status %d: %s", resp.StatusCode, string(bodyBytes))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Heartbeat sends a heartbeat to keep the worker alive.
|
|
func (c *APIClient) Heartbeat(ctx context.Context, workerID string) error {
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
|
fmt.Sprintf("%s/workers/%s/heartbeat", c.baseURL, workerID), nil)
|
|
if err != nil {
|
|
return fmt.Errorf("create request: %w", err)
|
|
}
|
|
c.setHeaders(httpReq)
|
|
|
|
resp, err := c.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return fmt.Errorf("heartbeat: %w", err)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
bodyBytes, readErr := io.ReadAll(resp.Body)
|
|
if readErr != nil {
|
|
return fmt.Errorf("heartbeat returned status %d (failed to read body: %w)", resp.StatusCode, readErr)
|
|
}
|
|
return fmt.Errorf("heartbeat returned status %d: %s", resp.StatusCode, string(bodyBytes))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ClaimTaskResponse is the response from claiming a task.
|
|
type ClaimTaskResponse struct {
|
|
Success bool `json:"success"`
|
|
Data struct {
|
|
Task *WorkTaskData `json:"task"`
|
|
WorkerID string `json:"worker_id"`
|
|
} `json:"data"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// WorkTaskData is the task data returned from the API.
|
|
type WorkTaskData struct {
|
|
ID string `json:"id"`
|
|
ProjectID string `json:"project_id"`
|
|
Type string `json:"type"`
|
|
Spec map[string]any `json:"spec"`
|
|
Status string `json:"status"`
|
|
Priority int `json:"priority"`
|
|
WorkerID string `json:"worker_id,omitempty"`
|
|
CallbackURL string `json:"callback_url,omitempty"`
|
|
CreatedAt string `json:"created_at"`
|
|
StartedAt string `json:"started_at,omitempty"`
|
|
RetryCount int `json:"retry_count"`
|
|
MaxRetries int `json:"max_retries"`
|
|
}
|
|
|
|
// ToWorkTask converts the API task data to a domain work task.
|
|
func (d *WorkTaskData) ToWorkTask() *domain.WorkTask {
|
|
if d == nil {
|
|
return nil
|
|
}
|
|
task := &domain.WorkTask{
|
|
ID: d.ID,
|
|
ProjectID: d.ProjectID,
|
|
Type: domain.WorkTaskType(d.Type),
|
|
Spec: d.Spec,
|
|
Status: domain.WorkTaskStatus(d.Status),
|
|
Priority: d.Priority,
|
|
WorkerID: d.WorkerID,
|
|
CallbackURL: d.CallbackURL,
|
|
RetryCount: d.RetryCount,
|
|
MaxRetries: d.MaxRetries,
|
|
}
|
|
if d.CreatedAt != "" {
|
|
task.CreatedAt, _ = time.Parse(time.RFC3339, d.CreatedAt)
|
|
}
|
|
if d.StartedAt != "" {
|
|
t, _ := time.Parse(time.RFC3339, d.StartedAt)
|
|
task.StartedAt = &t
|
|
}
|
|
return task
|
|
}
|
|
|
|
// ClaimTask claims the next available task from the queue.
|
|
// Returns nil if no tasks are available.
|
|
func (c *APIClient) ClaimTask(ctx context.Context, workerID string) (*domain.WorkTask, error) {
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
|
fmt.Sprintf("%s/workers/%s/claim", c.baseURL, workerID), nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create request: %w", err)
|
|
}
|
|
c.setHeaders(httpReq)
|
|
|
|
resp, err := c.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("claim task: %w", err)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
// 204 No Content = no tasks available
|
|
if resp.StatusCode == http.StatusNoContent {
|
|
return nil, nil
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
bodyBytes, readErr := io.ReadAll(resp.Body)
|
|
if readErr != nil {
|
|
return nil, fmt.Errorf("claim task returned status %d (failed to read body: %w)", resp.StatusCode, readErr)
|
|
}
|
|
return nil, fmt.Errorf("claim task returned status %d: %s", resp.StatusCode, string(bodyBytes))
|
|
}
|
|
|
|
var result ClaimTaskResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, fmt.Errorf("decode response: %w", err)
|
|
}
|
|
|
|
if result.Data.Task == nil {
|
|
return nil, fmt.Errorf("API returned success but no task data")
|
|
}
|
|
return result.Data.Task.ToWorkTask(), nil
|
|
}
|
|
|
|
// CompleteTaskRequest is the request to complete a task.
|
|
type CompleteTaskRequest struct {
|
|
Success bool `json:"success"`
|
|
Output string `json:"output,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
CommitSHA string `json:"commit_sha,omitempty"`
|
|
FilesChanged []string `json:"files_changed,omitempty"`
|
|
Artifacts map[string]string `json:"artifacts,omitempty"`
|
|
DurationMs int64 `json:"duration_ms,omitempty"`
|
|
}
|
|
|
|
// CompleteTask marks a task as complete.
|
|
func (c *APIClient) CompleteTask(ctx context.Context, workerID, taskID string, result *domain.BuildResult) error {
|
|
req := &CompleteTaskRequest{
|
|
Success: result.Success,
|
|
Output: result.Output,
|
|
Error: result.Error,
|
|
CommitSHA: result.CommitSHA,
|
|
DurationMs: result.DurationMs,
|
|
}
|
|
if result.FilesChanged != nil {
|
|
req.FilesChanged = result.FilesChanged
|
|
}
|
|
if result.Artifacts != nil {
|
|
req.Artifacts = result.Artifacts
|
|
}
|
|
|
|
body, err := json.Marshal(req)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal request: %w", err)
|
|
}
|
|
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
|
fmt.Sprintf("%s/workers/%s/complete/%s", c.baseURL, workerID, taskID), bytes.NewReader(body))
|
|
if err != nil {
|
|
return fmt.Errorf("create request: %w", err)
|
|
}
|
|
c.setHeaders(httpReq)
|
|
|
|
resp, err := c.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return fmt.Errorf("complete task: %w", err)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
bodyBytes, readErr := io.ReadAll(resp.Body)
|
|
if readErr != nil {
|
|
return fmt.Errorf("complete task returned status %d (failed to read body: %w)", resp.StatusCode, readErr)
|
|
}
|
|
return fmt.Errorf("complete task returned status %d: %s", resp.StatusCode, string(bodyBytes))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// FailTaskRequest is the request to fail a task.
|
|
type FailTaskRequest struct {
|
|
Error string `json:"error"`
|
|
Output string `json:"output,omitempty"`
|
|
DurationMs int64 `json:"duration_ms,omitempty"`
|
|
}
|
|
|
|
// FailTask marks a task as failed.
|
|
func (c *APIClient) FailTask(ctx context.Context, workerID, taskID string, errMsg, output string, durationMs int64) error {
|
|
req := &FailTaskRequest{
|
|
Error: errMsg,
|
|
Output: output,
|
|
DurationMs: durationMs,
|
|
}
|
|
|
|
body, err := json.Marshal(req)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal request: %w", err)
|
|
}
|
|
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
|
fmt.Sprintf("%s/workers/%s/fail/%s", c.baseURL, workerID, taskID), bytes.NewReader(body))
|
|
if err != nil {
|
|
return fmt.Errorf("create request: %w", err)
|
|
}
|
|
c.setHeaders(httpReq)
|
|
|
|
resp, err := c.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return fmt.Errorf("fail task: %w", err)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
bodyBytes, readErr := io.ReadAll(resp.Body)
|
|
if readErr != nil {
|
|
return fmt.Errorf("fail task returned status %d (failed to read body: %w)", resp.StatusCode, readErr)
|
|
}
|
|
return fmt.Errorf("fail task returned status %d: %s", resp.StatusCode, string(bodyBytes))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// setHeaders sets common headers on the request.
|
|
func (c *APIClient) setHeaders(req *http.Request) {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
if c.apiKey != "" {
|
|
req.Header.Set("X-API-Key", c.apiKey)
|
|
}
|
|
}
|