All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
- Fix no-op RequireProjectAccess middleware to enforce project_ids
- Apply project access middleware to all project-scoped routes
- Filter GET /projects by allowed project IDs for restricted keys
- Add GET /me endpoint with key identity, scopes, and project access info
- Add PATCH /keys/{id} for partial key updates (name, scopes, project_ids, allowed_ips, expires_in)
- Add GET/POST/DELETE /projects/{id}/access for project-centric access management
- Auto-grant creating key access when using POST /project/create-and-build
- Accept grant_to_key_ids in create-and-build to grant multiple keys on project creation
- Move newProvisionerWithDeps test helper from production code to test file
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
383 lines
12 KiB
Go
383 lines
12 KiB
Go
// Package service provides business logic / use cases for the application.
|
|
// Services orchestrate domain operations using port interfaces.
|
|
package service
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/logging"
|
|
"github.com/orchard9/rdev/internal/metrics"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
"github.com/orchard9/rdev/internal/sanitize"
|
|
)
|
|
|
|
// ProjectService handles project-related business logic.
|
|
type ProjectService struct {
|
|
projects port.ProjectRepository
|
|
executor port.CommandExecutor
|
|
streams port.StreamPublisher
|
|
auditLogger port.AuditLogger // Optional audit logger
|
|
queue port.CommandQueue // Optional command queue
|
|
webhookDispatcher port.WebhookDispatcher // Optional webhook dispatcher
|
|
agentRegistry port.CodeAgentRegistry // Optional code agent registry
|
|
cmdID atomic.Uint64
|
|
}
|
|
|
|
// NewProjectService creates a new project service.
|
|
func NewProjectService(
|
|
projects port.ProjectRepository,
|
|
executor port.CommandExecutor,
|
|
streams port.StreamPublisher,
|
|
) *ProjectService {
|
|
return &ProjectService{
|
|
projects: projects,
|
|
executor: executor,
|
|
streams: streams,
|
|
}
|
|
}
|
|
|
|
// WithAuditLogger sets an audit logger for the service.
|
|
func (s *ProjectService) WithAuditLogger(auditLogger port.AuditLogger) *ProjectService {
|
|
s.auditLogger = auditLogger
|
|
return s
|
|
}
|
|
|
|
// WithCommandQueue sets a command queue for async execution.
|
|
func (s *ProjectService) WithCommandQueue(queue port.CommandQueue) *ProjectService {
|
|
s.queue = queue
|
|
return s
|
|
}
|
|
|
|
// WithWebhookDispatcher sets a webhook dispatcher for event notifications.
|
|
func (s *ProjectService) WithWebhookDispatcher(dispatcher port.WebhookDispatcher) *ProjectService {
|
|
s.webhookDispatcher = dispatcher
|
|
return s
|
|
}
|
|
|
|
// WithCodeAgentRegistry sets a code agent registry for multi-provider support.
|
|
func (s *ProjectService) WithCodeAgentRegistry(registry port.CodeAgentRegistry) *ProjectService {
|
|
s.agentRegistry = registry
|
|
return s
|
|
}
|
|
|
|
// AuditContext contains audit-related information from the request.
|
|
type AuditContext struct {
|
|
APIKeyID string
|
|
ClientIP string
|
|
UserAgent string
|
|
}
|
|
|
|
// List returns available projects with refreshed status.
|
|
// allowedProjectIDs restricts results to specific projects; nil means unrestricted.
|
|
func (s *ProjectService) List(ctx context.Context, allowedProjectIDs []domain.ProjectID) ([]domain.Project, error) {
|
|
log := logging.FromContext(ctx).WithService("ProjectService")
|
|
// Refresh status from Kubernetes
|
|
if err := s.projects.RefreshStatus(ctx); err != nil {
|
|
log.Warn("failed to refresh project status", logging.FieldError, err)
|
|
}
|
|
|
|
projects, err := s.projects.List(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// nil = unrestricted (admin or no project_ids restriction)
|
|
if allowedProjectIDs == nil {
|
|
return projects, nil
|
|
}
|
|
|
|
// Filter to only allowed projects
|
|
allowed := make(map[domain.ProjectID]bool, len(allowedProjectIDs))
|
|
for _, id := range allowedProjectIDs {
|
|
allowed[id] = true
|
|
}
|
|
filtered := projects[:0]
|
|
for _, p := range projects {
|
|
if allowed[p.ID] {
|
|
filtered = append(filtered, p)
|
|
}
|
|
}
|
|
return filtered, nil
|
|
}
|
|
|
|
// Get returns a specific project by ID.
|
|
func (s *ProjectService) Get(ctx context.Context, id domain.ProjectID) (*domain.Project, error) {
|
|
log := logging.FromContext(ctx).WithService("ProjectService")
|
|
project, err := s.projects.Get(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Refresh status
|
|
if refreshErr := s.projects.RefreshStatus(ctx); refreshErr != nil {
|
|
log.Warn("failed to refresh project status", logging.FieldProjectID, id, logging.FieldError, refreshErr)
|
|
}
|
|
|
|
return project, nil
|
|
}
|
|
|
|
// Exists checks if a project exists.
|
|
func (s *ProjectService) Exists(ctx context.Context, id domain.ProjectID) (bool, error) {
|
|
return s.projects.Exists(ctx, id)
|
|
}
|
|
|
|
// ExecuteClaudeRequest contains parameters for running a Claude command.
|
|
type ExecuteClaudeRequest struct {
|
|
ProjectID domain.ProjectID
|
|
Prompt string
|
|
StreamID string
|
|
SessionID string // Optional: resume a previous session
|
|
Model string // Optional: model override (OpenCode only)
|
|
AllowedTools []string // Optional: restrict tool access
|
|
Audit *AuditContext // Optional audit context
|
|
}
|
|
|
|
// ExecuteClaudeResult contains the result of queuing a Claude command.
|
|
type ExecuteClaudeResult struct {
|
|
CommandID domain.CommandID
|
|
StreamURL string
|
|
SessionID string // Session ID for continuation
|
|
AgentProvider domain.AgentProvider
|
|
}
|
|
|
|
// ExecuteClaude runs a Claude command in the project's pod.
|
|
func (s *ProjectService) ExecuteClaude(ctx context.Context, req ExecuteClaudeRequest) (*ExecuteClaudeResult, error) {
|
|
// Validate project exists
|
|
project, err := s.projects.Get(ctx, req.ProjectID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Validate prompt
|
|
if req.Prompt == "" {
|
|
return nil, fmt.Errorf("%w: prompt is required", domain.ErrInvalidCommand)
|
|
}
|
|
if err := sanitize.ClaudePrompt(req.Prompt); err != nil {
|
|
return nil, fmt.Errorf("%w: %w", domain.ErrCommandSanitization, err)
|
|
}
|
|
|
|
// Validate stream ID
|
|
if err := sanitize.StreamID(req.StreamID); err != nil {
|
|
return nil, fmt.Errorf("%w: %w", domain.ErrInvalidCommand, err)
|
|
}
|
|
|
|
// Generate command ID
|
|
cmdNum := s.cmdID.Add(1)
|
|
cmdID := domain.CommandID(fmt.Sprintf("cmd-%s-%03d", req.ProjectID, cmdNum))
|
|
if req.StreamID != "" {
|
|
cmdID = domain.CommandID(req.StreamID)
|
|
}
|
|
|
|
// Create command
|
|
cmd := &domain.Command{
|
|
ID: cmdID,
|
|
ProjectID: req.ProjectID,
|
|
Type: domain.CommandTypeClaude,
|
|
Args: []string{req.Prompt},
|
|
StartedAt: time.Now(),
|
|
}
|
|
|
|
// Log audit start if audit logger is configured
|
|
if s.auditLogger != nil && req.Audit != nil {
|
|
log := logging.FromContext(ctx).WithService("ProjectService")
|
|
argsJSON, _ := json.Marshal(cmd.Args)
|
|
auditEntry := &domain.AuditLogEntry{
|
|
ID: uuid.New().String(),
|
|
APIKeyID: req.Audit.APIKeyID,
|
|
CommandID: string(cmdID),
|
|
ProjectID: string(req.ProjectID),
|
|
CommandType: domain.CommandTypeClaude,
|
|
Args: string(argsJSON),
|
|
ClientIP: req.Audit.ClientIP,
|
|
UserAgent: req.Audit.UserAgent,
|
|
StartedAt: cmd.StartedAt,
|
|
Status: domain.AuditStatusRunning,
|
|
}
|
|
if err := s.auditLogger.LogCommandStart(ctx, auditEntry); err != nil {
|
|
log.Warn("failed to log audit start", "command_id", cmdID, logging.FieldError, err)
|
|
}
|
|
}
|
|
|
|
// Resolve agent and execute
|
|
agent := s.resolveAgent(project)
|
|
if agent != nil {
|
|
// Use CodeAgent for execution
|
|
agentReq := &domain.AgentRequest{
|
|
Prompt: req.Prompt,
|
|
ProjectID: req.ProjectID,
|
|
SessionID: req.SessionID,
|
|
Model: req.Model,
|
|
AllowedTools: req.AllowedTools,
|
|
Metadata: map[string]string{"pod_name": project.PodName},
|
|
}
|
|
go s.executeAgentCommand(ctx, agent, agentReq, cmd)
|
|
|
|
return &ExecuteClaudeResult{
|
|
CommandID: cmdID,
|
|
StreamURL: fmt.Sprintf("/projects/%s/events?stream_id=%s", req.ProjectID, cmdID),
|
|
SessionID: req.SessionID, // Will be updated by agent result
|
|
AgentProvider: agent.Provider(),
|
|
}, nil
|
|
}
|
|
|
|
// Fallback to legacy executor
|
|
go s.executeCommand(ctx, project.PodName, cmd)
|
|
|
|
return &ExecuteClaudeResult{
|
|
CommandID: cmdID,
|
|
StreamURL: fmt.Sprintf("/projects/%s/events?stream_id=%s", req.ProjectID, cmdID),
|
|
}, nil
|
|
}
|
|
|
|
// executeCommand runs a command and streams output to subscribers.
|
|
// Uses context.WithoutCancel to preserve tracing/values but allow independent timeout.
|
|
func (s *ProjectService) executeCommand(parentCtx context.Context, podName string, cmd *domain.Command) {
|
|
// Derive from parent to preserve tracing/values, but with independent cancellation
|
|
ctx, cancel := context.WithTimeout(context.WithoutCancel(parentCtx), 10*time.Minute)
|
|
defer cancel()
|
|
|
|
log := logging.FromContext(ctx).WithService("ProjectService")
|
|
streamID := string(cmd.ID)
|
|
var lastEventID string
|
|
var outputSizeBytes int64
|
|
|
|
// Dispatch command.started webhook event
|
|
s.dispatchWebhookEvent(ctx, string(cmd.ProjectID), domain.WebhookEventCommandStarted, &domain.CommandEventData{
|
|
CommandID: string(cmd.ID),
|
|
CommandType: cmd.Type,
|
|
ProjectID: string(cmd.ProjectID),
|
|
StartedAt: cmd.StartedAt,
|
|
})
|
|
|
|
result, _ := s.executor.Execute(ctx, cmd, podName, func(line domain.OutputLine) {
|
|
eventID := s.streams.Publish(streamID, port.StreamEvent{
|
|
Type: "output",
|
|
Data: map[string]any{
|
|
"line": line.Line,
|
|
"stream": line.Stream,
|
|
},
|
|
})
|
|
lastEventID = eventID
|
|
outputSizeBytes += int64(len(line.Line))
|
|
})
|
|
|
|
// Send completion event
|
|
eventID := s.streams.Publish(streamID, port.StreamEvent{
|
|
Type: "complete",
|
|
Data: map[string]any{
|
|
"exit_code": result.ExitCode,
|
|
"duration_ms": result.DurationMs,
|
|
},
|
|
})
|
|
|
|
// Record metrics
|
|
status := "success"
|
|
if result.ExitCode != 0 {
|
|
status = "error"
|
|
}
|
|
metrics.RecordCommand(string(cmd.ProjectID), string(cmd.Type), status, result.DurationMs)
|
|
|
|
// Log audit completion if audit logger is configured
|
|
if s.auditLogger != nil {
|
|
var auditStatus domain.AuditStatus
|
|
var errorMsg string
|
|
if result.Error != nil {
|
|
auditStatus = domain.AuditStatusError
|
|
errorMsg = result.Error.Error()
|
|
} else if result.ExitCode != 0 {
|
|
auditStatus = domain.AuditStatusError
|
|
} else {
|
|
auditStatus = domain.AuditStatusSuccess
|
|
}
|
|
|
|
auditResult := &domain.AuditResult{
|
|
ExitCode: result.ExitCode,
|
|
DurationMs: result.DurationMs,
|
|
Status: auditStatus,
|
|
ErrorMessage: errorMsg,
|
|
OutputSizeBytes: outputSizeBytes,
|
|
}
|
|
if err := s.auditLogger.LogCommandEnd(ctx, string(cmd.ID), auditResult); err != nil {
|
|
log.Warn("failed to log audit end", "command_id", cmd.ID, logging.FieldError, err)
|
|
}
|
|
}
|
|
|
|
// Dispatch command.completed or command.failed webhook event
|
|
completedAt := time.Now()
|
|
var webhookEventType domain.WebhookEventType
|
|
var errorMsg string
|
|
if result.Error != nil {
|
|
webhookEventType = domain.WebhookEventCommandFailed
|
|
errorMsg = result.Error.Error()
|
|
} else if result.ExitCode != 0 {
|
|
webhookEventType = domain.WebhookEventCommandFailed
|
|
} else {
|
|
webhookEventType = domain.WebhookEventCommandCompleted
|
|
}
|
|
|
|
s.dispatchWebhookEvent(ctx, string(cmd.ProjectID), webhookEventType, &domain.CommandEventData{
|
|
CommandID: string(cmd.ID),
|
|
CommandType: cmd.Type,
|
|
ProjectID: string(cmd.ProjectID),
|
|
StartedAt: cmd.StartedAt,
|
|
CompletedAt: completedAt,
|
|
ExitCode: result.ExitCode,
|
|
DurationMs: result.DurationMs,
|
|
Error: errorMsg,
|
|
})
|
|
|
|
log.Debug("command completed",
|
|
"command_id", cmd.ID,
|
|
"exit_code", result.ExitCode,
|
|
logging.FieldDuration, result.DurationMs,
|
|
"last_event_id", lastEventID,
|
|
"complete_event_id", eventID,
|
|
)
|
|
|
|
// Clean up stream after a delay
|
|
go func() {
|
|
time.Sleep(30 * time.Second)
|
|
s.streams.Close(streamID)
|
|
}()
|
|
}
|
|
|
|
// dispatchWebhookEvent dispatches a webhook event if a dispatcher is configured.
|
|
func (s *ProjectService) dispatchWebhookEvent(ctx context.Context, projectID string, eventType domain.WebhookEventType, data any) {
|
|
if s.webhookDispatcher == nil {
|
|
return
|
|
}
|
|
|
|
event := &domain.WebhookEvent{
|
|
Type: eventType,
|
|
Timestamp: time.Now(),
|
|
ProjectID: projectID,
|
|
Data: data,
|
|
}
|
|
|
|
if err := s.webhookDispatcher.Dispatch(ctx, projectID, event); err != nil {
|
|
log := logging.FromContext(ctx).WithService("ProjectService")
|
|
log.Warn("failed to dispatch webhook event",
|
|
logging.FieldProjectID, projectID,
|
|
"event_type", eventType,
|
|
logging.FieldError, err,
|
|
)
|
|
}
|
|
}
|
|
|
|
// Subscribe returns a channel for receiving stream events.
|
|
func (s *ProjectService) Subscribe(streamID string) (<-chan port.StreamEvent, func()) {
|
|
return s.streams.Subscribe(streamID)
|
|
}
|
|
|
|
// SubscribeFromID returns a channel for receiving stream events, starting from a specific event ID.
|
|
// This is used for SSE reconnection with Last-Event-ID support.
|
|
func (s *ProjectService) SubscribeFromID(streamID, lastEventID string) (<-chan port.StreamEvent, func()) {
|
|
return s.streams.SubscribeFromID(streamID, lastEventID)
|
|
}
|