rdev/internal/service/project_service_agent.go
jordan 853ec4cf81 fix: go.work race condition with batch components and idempotent provisioning
Three coordinated fixes for CI pipeline race conditions:

1. Woodpecker step dependencies: Added depends_on: [deps] to all 6 component
   templates (service, worker, cli, app-astro, app-react, app-nextjs) so build
   steps wait for go work sync to complete.

2. Idempotent resource provisioning: Modified provisionResources() to check
   for existing database/cache before creating, preventing "already exists"
   errors on component re-adds.

3. Batch component endpoint: POST /projects/{id}/components/batch enables
   atomic multi-component additions in a single git commit. Validates all
   components upfront, provisions infra sequentially, commits code components
   atomically.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 12:31:40 -07:00

235 lines
6.6 KiB
Go

// Package service provides business logic / use cases for the application.
// This file contains CodeAgent integration for multi-provider support.
package service
import (
"context"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/port"
)
// resolveAgent returns the appropriate CodeAgent for a project.
// Returns nil if no agent registry is configured or no agent is available.
func (s *ProjectService) resolveAgent(project *domain.Project) port.CodeAgent {
if s.agentRegistry == nil {
return nil
}
// Try project-specific agent first
if project.AgentProvider != "" {
if agent := s.agentRegistry.Get(project.AgentProvider); agent != nil {
return agent
}
}
// Fall back to default
return s.agentRegistry.Default()
}
// executeAgentCommand runs a command via CodeAgent and streams output.
// Uses context.WithoutCancel to preserve tracing/values but allow independent timeout.
func (s *ProjectService) executeAgentCommand(parentCtx context.Context, agent port.CodeAgent, req *domain.AgentRequest, cmd *domain.Command) {
// Derive from parent to preserve tracing/values, but with independent cancellation
ctx, cancel := context.WithTimeout(context.WithoutCancel(parentCtx), 10*time.Minute)
defer cancel()
log := logging.FromContext(ctx).WithService("ProjectService")
streamID := string(cmd.ID)
var lastEventID string
var outputSizeBytes int64
// Dispatch command.started webhook event
s.dispatchWebhookEvent(ctx, string(cmd.ProjectID), domain.WebhookEventCommandStarted, &domain.CommandEventData{
CommandID: string(cmd.ID),
CommandType: cmd.Type,
ProjectID: string(cmd.ProjectID),
StartedAt: cmd.StartedAt,
})
// Execute via agent with event handler
result, _ := agent.Execute(ctx, req, func(event domain.AgentEvent) {
// Convert agent event to stream event
var eventType string
var data map[string]any
switch event.Type {
case domain.AgentEventOutput:
eventType = "output"
data = map[string]any{
"line": event.Content,
"stream": event.Stream,
}
case domain.AgentEventToolUse:
eventType = "tool_use"
data = map[string]any{
"tool": event.ToolName,
"input": event.ToolInput,
}
// Record tool use metric
if event.ToolName != "" {
metrics.RecordAgentToolUse(string(agent.Provider()), event.ToolName)
}
case domain.AgentEventToolResult:
eventType = "tool_result"
data = map[string]any{
"output": event.Content,
}
case domain.AgentEventError:
eventType = "error"
data = map[string]any{
"error": event.Content,
}
case domain.AgentEventComplete:
eventType = "agent_complete"
data = event.Metadata
default:
eventType = "output"
data = map[string]any{
"line": event.Content,
"stream": "stdout",
}
}
eventID := s.streams.Publish(streamID, port.StreamEvent{
Type: eventType,
Data: data,
})
lastEventID = eventID
outputSizeBytes += int64(len(event.Content))
})
// Send completion event
eventID := s.streams.Publish(streamID, port.StreamEvent{
Type: "complete",
Data: map[string]any{
"exit_code": result.ExitCode,
"duration_ms": result.DurationMs,
"session_id": result.SessionID,
"provider": agent.Provider(),
},
})
// Record metrics
status := "success"
if result.ExitCode != 0 {
status = "error"
}
metrics.RecordCommand(string(cmd.ProjectID), string(cmd.Type), status, result.DurationMs)
// Record agent-specific metrics
metrics.RecordAgentRequest(string(agent.Provider()), status, result.DurationMs)
// Log audit completion if audit logger is configured
if s.auditLogger != nil {
var auditStatus domain.AuditStatus
var errorMsg string
if result.Error != nil {
auditStatus = domain.AuditStatusError
errorMsg = result.Error.Error()
} else if result.ExitCode != 0 {
auditStatus = domain.AuditStatusError
} else {
auditStatus = domain.AuditStatusSuccess
}
auditResult := &domain.AuditResult{
ExitCode: result.ExitCode,
DurationMs: result.DurationMs,
Status: auditStatus,
ErrorMessage: errorMsg,
OutputSizeBytes: outputSizeBytes,
}
if err := s.auditLogger.LogCommandEnd(ctx, string(cmd.ID), auditResult); err != nil {
log.Warn("failed to log audit end", "command_id", cmd.ID, logging.FieldError, err)
}
}
// Dispatch command.completed or command.failed webhook event
completedAt := time.Now()
var webhookEventType domain.WebhookEventType
var errorMsg string
if result.Error != nil {
webhookEventType = domain.WebhookEventCommandFailed
errorMsg = result.Error.Error()
} else if result.ExitCode != 0 {
webhookEventType = domain.WebhookEventCommandFailed
} else {
webhookEventType = domain.WebhookEventCommandCompleted
}
s.dispatchWebhookEvent(ctx, string(cmd.ProjectID), webhookEventType, &domain.CommandEventData{
CommandID: string(cmd.ID),
CommandType: cmd.Type,
ProjectID: string(cmd.ProjectID),
StartedAt: cmd.StartedAt,
CompletedAt: completedAt,
ExitCode: result.ExitCode,
DurationMs: result.DurationMs,
Error: errorMsg,
})
log.Debug("agent command completed",
"command_id", cmd.ID,
"provider", agent.Provider(),
"session_id", result.SessionID,
"exit_code", result.ExitCode,
logging.FieldDuration, result.DurationMs,
"last_event_id", lastEventID,
"complete_event_id", eventID,
)
// Clean up stream after a delay
go func() {
time.Sleep(30 * time.Second)
s.streams.Close(streamID)
}()
}
// GetAgentCapabilities returns the capabilities for a specific agent provider.
// Returns nil if no agent registry is configured or the provider is not found.
func (s *ProjectService) GetAgentCapabilities(provider domain.AgentProvider) *domain.AgentCapabilities {
if s.agentRegistry == nil {
return nil
}
agent := s.agentRegistry.Get(provider)
if agent == nil {
return nil
}
caps := agent.Capabilities()
return &caps
}
// ListAvailableAgents returns all available agent providers.
func (s *ProjectService) ListAvailableAgents() []domain.AgentProvider {
if s.agentRegistry == nil {
return nil
}
return s.agentRegistry.Available()
}
// GetDefaultAgent returns the default agent provider.
func (s *ProjectService) GetDefaultAgent() domain.AgentProvider {
if s.agentRegistry == nil {
return ""
}
agent := s.agentRegistry.Default()
if agent == nil {
return ""
}
return agent.Provider()
}
// SetDefaultAgent sets the default agent provider.
func (s *ProjectService) SetDefaultAgent(provider domain.AgentProvider) error {
if s.agentRegistry == nil {
return domain.ErrInvalidAgentProvider
}
return s.agentRegistry.SetDefault(provider)
}