rdev/internal/service/project_service.go
jordan 53862c773b fix: resolve systemic debt in worker and skeleton templates
Worker template fixes:
- Replace panic() with logger.Error() + os.Exit(1) for config errors
- Remove double-timeout application (context + middleware)
- Add error message truncation to prevent log bloat
- Use named constants for shutdown grace period and stale check interval

Skeleton pkg/auth fixes:
- Fix error wrapping to use %w consistently in jwt.go
- Add GetUserOrError() as safe alternative to MustGetUser() panic

Skeleton pkg/queue fixes:
- Check RowsAffected() errors instead of ignoring them
- Add input validation to EnqueueWithOptions (require job type, cap retries)
- Add log truncation for error messages
- Fix inaccurate doc comment claiming exponential backoff

Worker timeout consolidation:
- Add internal/worker/timeouts.go with named constants
- Migrate all workers to use timeout constants

Cleanup:
- Remove obsolete slack-preparation-thoughts.md files

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 23:44:55 -07:00

358 lines
11 KiB
Go

// Package service provides business logic / use cases for the application.
// Services orchestrate domain operations using port interfaces.
package service
import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/port"
"github.com/orchard9/rdev/internal/sanitize"
)
// ProjectService handles project-related business logic.
type ProjectService struct {
projects port.ProjectRepository
executor port.CommandExecutor
streams port.StreamPublisher
auditLogger port.AuditLogger // Optional audit logger
queue port.CommandQueue // Optional command queue
webhookDispatcher port.WebhookDispatcher // Optional webhook dispatcher
agentRegistry port.CodeAgentRegistry // Optional code agent registry
cmdID atomic.Uint64
}
// NewProjectService creates a new project service.
func NewProjectService(
projects port.ProjectRepository,
executor port.CommandExecutor,
streams port.StreamPublisher,
) *ProjectService {
return &ProjectService{
projects: projects,
executor: executor,
streams: streams,
}
}
// WithAuditLogger sets an audit logger for the service.
func (s *ProjectService) WithAuditLogger(auditLogger port.AuditLogger) *ProjectService {
s.auditLogger = auditLogger
return s
}
// WithCommandQueue sets a command queue for async execution.
func (s *ProjectService) WithCommandQueue(queue port.CommandQueue) *ProjectService {
s.queue = queue
return s
}
// WithWebhookDispatcher sets a webhook dispatcher for event notifications.
func (s *ProjectService) WithWebhookDispatcher(dispatcher port.WebhookDispatcher) *ProjectService {
s.webhookDispatcher = dispatcher
return s
}
// WithCodeAgentRegistry sets a code agent registry for multi-provider support.
func (s *ProjectService) WithCodeAgentRegistry(registry port.CodeAgentRegistry) *ProjectService {
s.agentRegistry = registry
return s
}
// AuditContext contains audit-related information from the request.
type AuditContext struct {
APIKeyID string
ClientIP string
UserAgent string
}
// List returns all available projects with refreshed status.
func (s *ProjectService) List(ctx context.Context) ([]domain.Project, error) {
log := logging.FromContext(ctx).WithService("ProjectService")
// Refresh status from Kubernetes
if err := s.projects.RefreshStatus(ctx); err != nil {
log.Warn("failed to refresh project status", logging.FieldError, err)
}
return s.projects.List(ctx)
}
// Get returns a specific project by ID.
func (s *ProjectService) Get(ctx context.Context, id domain.ProjectID) (*domain.Project, error) {
log := logging.FromContext(ctx).WithService("ProjectService")
project, err := s.projects.Get(ctx, id)
if err != nil {
return nil, err
}
// Refresh status
if refreshErr := s.projects.RefreshStatus(ctx); refreshErr != nil {
log.Warn("failed to refresh project status", logging.FieldProjectID, id, logging.FieldError, refreshErr)
}
return project, nil
}
// Exists checks if a project exists.
func (s *ProjectService) Exists(ctx context.Context, id domain.ProjectID) (bool, error) {
return s.projects.Exists(ctx, id)
}
// ExecuteClaudeRequest contains parameters for running a Claude command.
type ExecuteClaudeRequest struct {
ProjectID domain.ProjectID
Prompt string
StreamID string
SessionID string // Optional: resume a previous session
Model string // Optional: model override (OpenCode only)
AllowedTools []string // Optional: restrict tool access
Audit *AuditContext // Optional audit context
}
// ExecuteClaudeResult contains the result of queuing a Claude command.
type ExecuteClaudeResult struct {
CommandID domain.CommandID
StreamURL string
SessionID string // Session ID for continuation
AgentProvider domain.AgentProvider
}
// ExecuteClaude runs a Claude command in the project's pod.
func (s *ProjectService) ExecuteClaude(ctx context.Context, req ExecuteClaudeRequest) (*ExecuteClaudeResult, error) {
// Validate project exists
project, err := s.projects.Get(ctx, req.ProjectID)
if err != nil {
return nil, err
}
// Validate prompt
if req.Prompt == "" {
return nil, fmt.Errorf("%w: prompt is required", domain.ErrInvalidCommand)
}
if err := sanitize.ClaudePrompt(req.Prompt); err != nil {
return nil, fmt.Errorf("%w: %w", domain.ErrCommandSanitization, err)
}
// Validate stream ID
if err := sanitize.StreamID(req.StreamID); err != nil {
return nil, fmt.Errorf("%w: %w", domain.ErrInvalidCommand, err)
}
// Generate command ID
cmdNum := s.cmdID.Add(1)
cmdID := domain.CommandID(fmt.Sprintf("cmd-%s-%03d", req.ProjectID, cmdNum))
if req.StreamID != "" {
cmdID = domain.CommandID(req.StreamID)
}
// Create command
cmd := &domain.Command{
ID: cmdID,
ProjectID: req.ProjectID,
Type: domain.CommandTypeClaude,
Args: []string{req.Prompt},
StartedAt: time.Now(),
}
// Log audit start if audit logger is configured
if s.auditLogger != nil && req.Audit != nil {
log := logging.FromContext(ctx).WithService("ProjectService")
argsJSON, _ := json.Marshal(cmd.Args)
auditEntry := &domain.AuditLogEntry{
ID: uuid.New().String(),
APIKeyID: req.Audit.APIKeyID,
CommandID: string(cmdID),
ProjectID: string(req.ProjectID),
CommandType: domain.CommandTypeClaude,
Args: string(argsJSON),
ClientIP: req.Audit.ClientIP,
UserAgent: req.Audit.UserAgent,
StartedAt: cmd.StartedAt,
Status: domain.AuditStatusRunning,
}
if err := s.auditLogger.LogCommandStart(ctx, auditEntry); err != nil {
log.Warn("failed to log audit start", "command_id", cmdID, logging.FieldError, err)
}
}
// Resolve agent and execute
agent := s.resolveAgent(project)
if agent != nil {
// Use CodeAgent for execution
agentReq := &domain.AgentRequest{
Prompt: req.Prompt,
ProjectID: req.ProjectID,
SessionID: req.SessionID,
Model: req.Model,
AllowedTools: req.AllowedTools,
Metadata: map[string]string{"pod_name": project.PodName},
}
go s.executeAgentCommand(agent, agentReq, cmd)
return &ExecuteClaudeResult{
CommandID: cmdID,
StreamURL: fmt.Sprintf("/projects/%s/events?stream_id=%s", req.ProjectID, cmdID),
SessionID: req.SessionID, // Will be updated by agent result
AgentProvider: agent.Provider(),
}, nil
}
// Fallback to legacy executor
go s.executeCommand(project.PodName, cmd)
return &ExecuteClaudeResult{
CommandID: cmdID,
StreamURL: fmt.Sprintf("/projects/%s/events?stream_id=%s", req.ProjectID, cmdID),
}, nil
}
// executeCommand runs a command and streams output to subscribers.
func (s *ProjectService) executeCommand(podName string, cmd *domain.Command) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
log := logging.FromContext(ctx).WithService("ProjectService")
streamID := string(cmd.ID)
var lastEventID string
var outputSizeBytes int64
// Dispatch command.started webhook event
s.dispatchWebhookEvent(ctx, string(cmd.ProjectID), domain.WebhookEventCommandStarted, &domain.CommandEventData{
CommandID: string(cmd.ID),
CommandType: cmd.Type,
ProjectID: string(cmd.ProjectID),
StartedAt: cmd.StartedAt,
})
result, _ := s.executor.Execute(ctx, cmd, podName, func(line domain.OutputLine) {
eventID := s.streams.Publish(streamID, port.StreamEvent{
Type: "output",
Data: map[string]any{
"line": line.Line,
"stream": line.Stream,
},
})
lastEventID = eventID
outputSizeBytes += int64(len(line.Line))
})
// Send completion event
eventID := s.streams.Publish(streamID, port.StreamEvent{
Type: "complete",
Data: map[string]any{
"exit_code": result.ExitCode,
"duration_ms": result.DurationMs,
},
})
// Record metrics
status := "success"
if result.ExitCode != 0 {
status = "error"
}
metrics.RecordCommand(string(cmd.ProjectID), string(cmd.Type), status, result.DurationMs)
// Log audit completion if audit logger is configured
if s.auditLogger != nil {
var auditStatus domain.AuditStatus
var errorMsg string
if result.Error != nil {
auditStatus = domain.AuditStatusError
errorMsg = result.Error.Error()
} else if result.ExitCode != 0 {
auditStatus = domain.AuditStatusError
} else {
auditStatus = domain.AuditStatusSuccess
}
auditResult := &domain.AuditResult{
ExitCode: result.ExitCode,
DurationMs: result.DurationMs,
Status: auditStatus,
ErrorMessage: errorMsg,
OutputSizeBytes: outputSizeBytes,
}
if err := s.auditLogger.LogCommandEnd(ctx, string(cmd.ID), auditResult); err != nil {
log.Warn("failed to log audit end", "command_id", cmd.ID, logging.FieldError, err)
}
}
// Dispatch command.completed or command.failed webhook event
completedAt := time.Now()
var webhookEventType domain.WebhookEventType
var errorMsg string
if result.Error != nil {
webhookEventType = domain.WebhookEventCommandFailed
errorMsg = result.Error.Error()
} else if result.ExitCode != 0 {
webhookEventType = domain.WebhookEventCommandFailed
} else {
webhookEventType = domain.WebhookEventCommandCompleted
}
s.dispatchWebhookEvent(ctx, string(cmd.ProjectID), webhookEventType, &domain.CommandEventData{
CommandID: string(cmd.ID),
CommandType: cmd.Type,
ProjectID: string(cmd.ProjectID),
StartedAt: cmd.StartedAt,
CompletedAt: completedAt,
ExitCode: result.ExitCode,
DurationMs: result.DurationMs,
Error: errorMsg,
})
log.Debug("command completed",
"command_id", cmd.ID,
"exit_code", result.ExitCode,
logging.FieldDuration, result.DurationMs,
"last_event_id", lastEventID,
"complete_event_id", eventID,
)
// Clean up stream after a delay
go func() {
time.Sleep(30 * time.Second)
s.streams.Close(streamID)
}()
}
// dispatchWebhookEvent dispatches a webhook event if a dispatcher is configured.
func (s *ProjectService) dispatchWebhookEvent(ctx context.Context, projectID string, eventType domain.WebhookEventType, data any) {
if s.webhookDispatcher == nil {
return
}
event := &domain.WebhookEvent{
Type: eventType,
Timestamp: time.Now(),
ProjectID: projectID,
Data: data,
}
if err := s.webhookDispatcher.Dispatch(ctx, projectID, event); err != nil {
log := logging.FromContext(ctx).WithService("ProjectService")
log.Warn("failed to dispatch webhook event",
logging.FieldProjectID, projectID,
"event_type", eventType,
logging.FieldError, err,
)
}
}
// Subscribe returns a channel for receiving stream events.
func (s *ProjectService) Subscribe(streamID string) (<-chan port.StreamEvent, func()) {
return s.streams.Subscribe(streamID)
}
// SubscribeFromID returns a channel for receiving stream events, starting from a specific event ID.
// This is used for SSE reconnection with Last-Event-ID support.
func (s *ProjectService) SubscribeFromID(streamID, lastEventID string) (<-chan port.StreamEvent, func()) {
return s.streams.SubscribeFromID(streamID, lastEventID)
}