// Package service provides business logic / use cases for the application. // Services orchestrate domain operations using port interfaces. package service import ( "context" "encoding/json" "fmt" "sync/atomic" "time" "github.com/google/uuid" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/sanitize" ) // ProjectService handles project-related business logic. type ProjectService struct { projects port.ProjectRepository executor port.CommandExecutor streams port.StreamPublisher auditLogger port.AuditLogger // Optional audit logger queue port.CommandQueue // Optional command queue webhookDispatcher port.WebhookDispatcher // Optional webhook dispatcher agentRegistry port.CodeAgentRegistry // Optional code agent registry cmdID atomic.Uint64 } // NewProjectService creates a new project service. func NewProjectService( projects port.ProjectRepository, executor port.CommandExecutor, streams port.StreamPublisher, ) *ProjectService { return &ProjectService{ projects: projects, executor: executor, streams: streams, } } // WithAuditLogger sets an audit logger for the service. func (s *ProjectService) WithAuditLogger(auditLogger port.AuditLogger) *ProjectService { s.auditLogger = auditLogger return s } // WithCommandQueue sets a command queue for async execution. func (s *ProjectService) WithCommandQueue(queue port.CommandQueue) *ProjectService { s.queue = queue return s } // WithWebhookDispatcher sets a webhook dispatcher for event notifications. func (s *ProjectService) WithWebhookDispatcher(dispatcher port.WebhookDispatcher) *ProjectService { s.webhookDispatcher = dispatcher return s } // WithCodeAgentRegistry sets a code agent registry for multi-provider support. func (s *ProjectService) WithCodeAgentRegistry(registry port.CodeAgentRegistry) *ProjectService { s.agentRegistry = registry return s } // AuditContext contains audit-related information from the request. type AuditContext struct { APIKeyID string ClientIP string UserAgent string } // List returns all available projects with refreshed status. func (s *ProjectService) List(ctx context.Context) ([]domain.Project, error) { log := logging.FromContext(ctx).WithService("ProjectService") // Refresh status from Kubernetes if err := s.projects.RefreshStatus(ctx); err != nil { log.Warn("failed to refresh project status", logging.FieldError, err) } return s.projects.List(ctx) } // Get returns a specific project by ID. func (s *ProjectService) Get(ctx context.Context, id domain.ProjectID) (*domain.Project, error) { log := logging.FromContext(ctx).WithService("ProjectService") project, err := s.projects.Get(ctx, id) if err != nil { return nil, err } // Refresh status if refreshErr := s.projects.RefreshStatus(ctx); refreshErr != nil { log.Warn("failed to refresh project status", logging.FieldProjectID, id, logging.FieldError, refreshErr) } return project, nil } // Exists checks if a project exists. func (s *ProjectService) Exists(ctx context.Context, id domain.ProjectID) (bool, error) { return s.projects.Exists(ctx, id) } // ExecuteClaudeRequest contains parameters for running a Claude command. type ExecuteClaudeRequest struct { ProjectID domain.ProjectID Prompt string StreamID string SessionID string // Optional: resume a previous session Model string // Optional: model override (OpenCode only) AllowedTools []string // Optional: restrict tool access Audit *AuditContext // Optional audit context } // ExecuteClaudeResult contains the result of queuing a Claude command. type ExecuteClaudeResult struct { CommandID domain.CommandID StreamURL string SessionID string // Session ID for continuation AgentProvider domain.AgentProvider } // ExecuteClaude runs a Claude command in the project's pod. func (s *ProjectService) ExecuteClaude(ctx context.Context, req ExecuteClaudeRequest) (*ExecuteClaudeResult, error) { // Validate project exists project, err := s.projects.Get(ctx, req.ProjectID) if err != nil { return nil, err } // Validate prompt if req.Prompt == "" { return nil, fmt.Errorf("%w: prompt is required", domain.ErrInvalidCommand) } if err := sanitize.ClaudePrompt(req.Prompt); err != nil { return nil, fmt.Errorf("%w: %w", domain.ErrCommandSanitization, err) } // Validate stream ID if err := sanitize.StreamID(req.StreamID); err != nil { return nil, fmt.Errorf("%w: %w", domain.ErrInvalidCommand, err) } // Generate command ID cmdNum := s.cmdID.Add(1) cmdID := domain.CommandID(fmt.Sprintf("cmd-%s-%03d", req.ProjectID, cmdNum)) if req.StreamID != "" { cmdID = domain.CommandID(req.StreamID) } // Create command cmd := &domain.Command{ ID: cmdID, ProjectID: req.ProjectID, Type: domain.CommandTypeClaude, Args: []string{req.Prompt}, StartedAt: time.Now(), } // Log audit start if audit logger is configured if s.auditLogger != nil && req.Audit != nil { log := logging.FromContext(ctx).WithService("ProjectService") argsJSON, _ := json.Marshal(cmd.Args) auditEntry := &domain.AuditLogEntry{ ID: uuid.New().String(), APIKeyID: req.Audit.APIKeyID, CommandID: string(cmdID), ProjectID: string(req.ProjectID), CommandType: domain.CommandTypeClaude, Args: string(argsJSON), ClientIP: req.Audit.ClientIP, UserAgent: req.Audit.UserAgent, StartedAt: cmd.StartedAt, Status: domain.AuditStatusRunning, } if err := s.auditLogger.LogCommandStart(ctx, auditEntry); err != nil { log.Warn("failed to log audit start", "command_id", cmdID, logging.FieldError, err) } } // Resolve agent and execute agent := s.resolveAgent(project) if agent != nil { // Use CodeAgent for execution agentReq := &domain.AgentRequest{ Prompt: req.Prompt, ProjectID: req.ProjectID, SessionID: req.SessionID, Model: req.Model, AllowedTools: req.AllowedTools, Metadata: map[string]string{"pod_name": project.PodName}, } go s.executeAgentCommand(ctx, agent, agentReq, cmd) return &ExecuteClaudeResult{ CommandID: cmdID, StreamURL: fmt.Sprintf("/projects/%s/events?stream_id=%s", req.ProjectID, cmdID), SessionID: req.SessionID, // Will be updated by agent result AgentProvider: agent.Provider(), }, nil } // Fallback to legacy executor go s.executeCommand(ctx, project.PodName, cmd) return &ExecuteClaudeResult{ CommandID: cmdID, StreamURL: fmt.Sprintf("/projects/%s/events?stream_id=%s", req.ProjectID, cmdID), }, nil } // executeCommand runs a command and streams output to subscribers. // Uses context.WithoutCancel to preserve tracing/values but allow independent timeout. func (s *ProjectService) executeCommand(parentCtx context.Context, podName string, cmd *domain.Command) { // Derive from parent to preserve tracing/values, but with independent cancellation ctx, cancel := context.WithTimeout(context.WithoutCancel(parentCtx), 10*time.Minute) defer cancel() log := logging.FromContext(ctx).WithService("ProjectService") streamID := string(cmd.ID) var lastEventID string var outputSizeBytes int64 // Dispatch command.started webhook event s.dispatchWebhookEvent(ctx, string(cmd.ProjectID), domain.WebhookEventCommandStarted, &domain.CommandEventData{ CommandID: string(cmd.ID), CommandType: cmd.Type, ProjectID: string(cmd.ProjectID), StartedAt: cmd.StartedAt, }) result, _ := s.executor.Execute(ctx, cmd, podName, func(line domain.OutputLine) { eventID := s.streams.Publish(streamID, port.StreamEvent{ Type: "output", Data: map[string]any{ "line": line.Line, "stream": line.Stream, }, }) lastEventID = eventID outputSizeBytes += int64(len(line.Line)) }) // Send completion event eventID := s.streams.Publish(streamID, port.StreamEvent{ Type: "complete", Data: map[string]any{ "exit_code": result.ExitCode, "duration_ms": result.DurationMs, }, }) // Record metrics status := "success" if result.ExitCode != 0 { status = "error" } metrics.RecordCommand(string(cmd.ProjectID), string(cmd.Type), status, result.DurationMs) // Log audit completion if audit logger is configured if s.auditLogger != nil { var auditStatus domain.AuditStatus var errorMsg string if result.Error != nil { auditStatus = domain.AuditStatusError errorMsg = result.Error.Error() } else if result.ExitCode != 0 { auditStatus = domain.AuditStatusError } else { auditStatus = domain.AuditStatusSuccess } auditResult := &domain.AuditResult{ ExitCode: result.ExitCode, DurationMs: result.DurationMs, Status: auditStatus, ErrorMessage: errorMsg, OutputSizeBytes: outputSizeBytes, } if err := s.auditLogger.LogCommandEnd(ctx, string(cmd.ID), auditResult); err != nil { log.Warn("failed to log audit end", "command_id", cmd.ID, logging.FieldError, err) } } // Dispatch command.completed or command.failed webhook event completedAt := time.Now() var webhookEventType domain.WebhookEventType var errorMsg string if result.Error != nil { webhookEventType = domain.WebhookEventCommandFailed errorMsg = result.Error.Error() } else if result.ExitCode != 0 { webhookEventType = domain.WebhookEventCommandFailed } else { webhookEventType = domain.WebhookEventCommandCompleted } s.dispatchWebhookEvent(ctx, string(cmd.ProjectID), webhookEventType, &domain.CommandEventData{ CommandID: string(cmd.ID), CommandType: cmd.Type, ProjectID: string(cmd.ProjectID), StartedAt: cmd.StartedAt, CompletedAt: completedAt, ExitCode: result.ExitCode, DurationMs: result.DurationMs, Error: errorMsg, }) log.Debug("command completed", "command_id", cmd.ID, "exit_code", result.ExitCode, logging.FieldDuration, result.DurationMs, "last_event_id", lastEventID, "complete_event_id", eventID, ) // Clean up stream after a delay go func() { time.Sleep(30 * time.Second) s.streams.Close(streamID) }() } // dispatchWebhookEvent dispatches a webhook event if a dispatcher is configured. func (s *ProjectService) dispatchWebhookEvent(ctx context.Context, projectID string, eventType domain.WebhookEventType, data any) { if s.webhookDispatcher == nil { return } event := &domain.WebhookEvent{ Type: eventType, Timestamp: time.Now(), ProjectID: projectID, Data: data, } if err := s.webhookDispatcher.Dispatch(ctx, projectID, event); err != nil { log := logging.FromContext(ctx).WithService("ProjectService") log.Warn("failed to dispatch webhook event", logging.FieldProjectID, projectID, "event_type", eventType, logging.FieldError, err, ) } } // Subscribe returns a channel for receiving stream events. func (s *ProjectService) Subscribe(streamID string) (<-chan port.StreamEvent, func()) { return s.streams.Subscribe(streamID) } // SubscribeFromID returns a channel for receiving stream events, starting from a specific event ID. // This is used for SSE reconnection with Last-Event-ID support. func (s *ProjectService) SubscribeFromID(streamID, lastEventID string) (<-chan port.StreamEvent, func()) { return s.streams.SubscribeFromID(streamID, lastEventID) }