rdev/internal/service/work_service.go
jordan d69da6d627 feat: add structured logging infrastructure and SDLC extensions
Major changes:
- Add internal/logging package with field constants, context propagation,
  sensitive data auto-redaction, and per-component log levels
- Add worker timeout constants (TimeoutQuickOp, TimeoutHealthCheck, etc.)
- Extend SDLC with callback handlers, generate endpoints, and executor
- Add new cookbook trees for aeries and slackpath progression
- Add skeleton templates for queue, realtime, and microservices
- Add worker component template with async job processing
- Refactor services and handlers to use new logging infrastructure
- Split component.go into component_infra.go and component_listing.go

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 22:56:04 -07:00

280 lines
7.8 KiB
Go

// Package service provides business logic services.
package service
import (
"context"
"fmt"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port"
"github.com/orchard9/rdev/internal/webhook"
)
// WorkService orchestrates work queue operations.
// It coordinates task enqueueing, completion, and webhook notifications.
type WorkService struct {
queue port.WorkQueue
webhookDispatcher *webhook.Dispatcher
}
// NewWorkService creates a new work service.
func NewWorkService(queue port.WorkQueue) *WorkService {
return &WorkService{
queue: queue,
}
}
// WithWebhookDispatcher adds a webhook dispatcher for task completion notifications.
func (s *WorkService) WithWebhookDispatcher(dispatcher *webhook.Dispatcher) *WorkService {
s.webhookDispatcher = dispatcher
return s
}
// EnqueueTask adds a new task to the work queue.
func (s *WorkService) EnqueueTask(ctx context.Context, req EnqueueTaskRequest) (*EnqueueTaskResult, error) {
// Validate required fields
if req.ProjectID == "" {
return nil, fmt.Errorf("project_id is required")
}
if req.Type == "" {
return nil, fmt.Errorf("task_type is required")
}
// Set defaults
maxRetries := req.MaxRetries
if maxRetries == 0 {
maxRetries = 3
}
task := &domain.WorkTask{
ProjectID: req.ProjectID,
Type: req.Type,
Spec: req.Spec,
Priority: req.Priority,
CallbackURL: req.CallbackURL,
MaxRetries: maxRetries,
}
taskID, err := s.queue.Enqueue(ctx, task)
if err != nil {
return nil, fmt.Errorf("enqueue task: %w", err)
}
log := logging.FromContext(ctx).WithService("work")
log.Info("task enqueued",
"task_id", taskID,
logging.FieldProjectID, req.ProjectID,
"type", req.Type,
"priority", req.Priority,
)
return &EnqueueTaskResult{
TaskID: taskID,
StatusURL: fmt.Sprintf("/work/%s/status", taskID),
}, nil
}
// DequeueTask claims the next available task for a worker.
func (s *WorkService) DequeueTask(ctx context.Context, workerID string) (*domain.WorkTask, error) {
if workerID == "" {
return nil, fmt.Errorf("worker_id is required")
}
task, err := s.queue.Dequeue(ctx, workerID)
if err != nil {
return nil, fmt.Errorf("dequeue task: %w", err)
}
if task != nil {
log := logging.FromContext(ctx).WithService("work")
log.Info("task claimed by worker",
"task_id", task.ID,
"worker_id", workerID,
logging.FieldProjectID, task.ProjectID,
"type", task.Type,
)
}
return task, nil
}
// CompleteTask marks a task as successfully completed.
func (s *WorkService) CompleteTask(ctx context.Context, taskID string, result *domain.WorkResult) error {
// Get task for callback URL before completing
task, err := s.queue.GetTask(ctx, taskID)
if err != nil {
return fmt.Errorf("get task: %w", err)
}
if err := s.queue.Complete(ctx, taskID, result); err != nil {
return fmt.Errorf("complete task: %w", err)
}
log := logging.FromContext(ctx).WithService("work")
log.Info("task completed",
"task_id", taskID,
logging.FieldProjectID, task.ProjectID,
"type", task.Type,
)
// Send webhook notification if callback URL is set
if task.CallbackURL != "" {
s.notifyCallback(task, "completed", result, "")
}
return nil
}
// FailTask marks a task as failed.
func (s *WorkService) FailTask(ctx context.Context, taskID string, errMsg string) error {
return s.FailTaskWithCode(ctx, taskID, errMsg, domain.WorkErrorCodeNone)
}
// FailTaskWithCode marks a task as failed with a categorized error code.
// The error code enables clients to distinguish failure types and respond appropriately.
func (s *WorkService) FailTaskWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error {
// Get task for callback URL before failing
task, err := s.queue.GetTask(ctx, taskID)
if err != nil {
return fmt.Errorf("get task: %w", err)
}
if err := s.queue.FailWithCode(ctx, taskID, errMsg, code); err != nil {
return fmt.Errorf("fail task: %w", err)
}
// Check if it was requeued or permanently failed
log := logging.FromContext(ctx).WithService("work")
updatedTask, _ := s.queue.GetTask(ctx, taskID)
if updatedTask != nil && updatedTask.Status == domain.WorkTaskStatusFailed {
log.Warn("task failed permanently",
"task_id", taskID,
logging.FieldProjectID, task.ProjectID,
"type", task.Type,
logging.FieldError, errMsg,
"error_code", code,
logging.FieldRetryCount, task.RetryCount,
)
// Send webhook notification for permanent failure
if task.CallbackURL != "" {
s.notifyCallback(task, "failed", nil, errMsg)
}
} else {
log.Warn("task failed, will retry",
"task_id", taskID,
logging.FieldProjectID, task.ProjectID,
"type", task.Type,
logging.FieldError, errMsg,
logging.FieldRetryCount, task.RetryCount+1,
)
}
return nil
}
// CancelTask cancels a pending task.
func (s *WorkService) CancelTask(ctx context.Context, taskID string) error {
task, err := s.queue.GetTask(ctx, taskID)
if err != nil {
return fmt.Errorf("get task: %w", err)
}
if err := s.queue.Cancel(ctx, taskID); err != nil {
return err
}
log := logging.FromContext(ctx).WithService("work")
log.Info("task cancelled",
"task_id", taskID,
logging.FieldProjectID, task.ProjectID,
"type", task.Type,
)
// Send webhook notification
if task.CallbackURL != "" {
s.notifyCallback(task, "cancelled", nil, "Task cancelled by user")
}
return nil
}
// GetTask retrieves a task by ID.
func (s *WorkService) GetTask(ctx context.Context, taskID string) (*domain.WorkTask, error) {
return s.queue.GetTask(ctx, taskID)
}
// ListByProject returns tasks for a project with pagination.
func (s *WorkService) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return s.queue.ListByProject(ctx, projectID, status, opts)
}
// GetStats returns queue statistics.
func (s *WorkService) GetStats(ctx context.Context) (*domain.WorkQueueStats, error) {
return s.queue.GetStats(ctx)
}
// notifyCallback sends a webhook notification for task status changes.
func (s *WorkService) notifyCallback(task *domain.WorkTask, status string, result *domain.WorkResult, errMsg string) {
if s.webhookDispatcher == nil || task.CallbackURL == "" {
return
}
payload := map[string]any{
"task_id": task.ID,
"project_id": task.ProjectID,
"task_type": string(task.Type),
"status": status,
}
if result != nil {
payload["result"] = result
}
if errMsg != "" {
payload["error"] = errMsg
}
// Dispatch webhook asynchronously
go func() {
if err := s.webhookDispatcher.DispatchToURL(task.CallbackURL, "work."+status, payload); err != nil {
log := logging.Default().WithService("work")
log.Error("failed to send callback",
"task_id", task.ID,
"callback_url", task.CallbackURL,
logging.FieldError, err.Error(),
)
}
}()
}
// EnqueueTaskRequest contains parameters for enqueueing a task.
type EnqueueTaskRequest struct {
// ProjectID is the project this task belongs to.
ProjectID string `json:"project_id"`
// Type is the task type (build, test, deploy, custom).
Type domain.WorkTaskType `json:"task_type"`
// Spec contains task-specific parameters.
Spec map[string]any `json:"task_spec"`
// Priority determines execution order (higher = more urgent).
Priority int `json:"priority,omitempty"`
// CallbackURL is the webhook URL for completion notification.
CallbackURL string `json:"callback_url,omitempty"`
// MaxRetries is the maximum allowed retry attempts (default: 3).
MaxRetries int `json:"max_retries,omitempty"`
}
// EnqueueTaskResult contains the result of enqueueing a task.
type EnqueueTaskResult struct {
// TaskID is the unique task identifier.
TaskID string `json:"task_id"`
// StatusURL is the URL to check task status.
StatusURL string `json:"status_url"`
}