Three coordinated fixes for CI pipeline race conditions:
1. Woodpecker step dependencies: Added depends_on: [deps] to all 6 component
templates (service, worker, cli, app-astro, app-react, app-nextjs) so build
steps wait for go work sync to complete.
2. Idempotent resource provisioning: Modified provisionResources() to check
for existing database/cache before creating, preventing "already exists"
errors on component re-adds.
3. Batch component endpoint: POST /projects/{id}/components/batch enables
atomic multi-component additions in a single git commit. Validates all
components upfront, provisions infra sequentially, commits code components
atomically.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
235 lines
6.6 KiB
Go
235 lines
6.6 KiB
Go
// Package service provides business logic / use cases for the application.
|
|
// This file contains CodeAgent integration for multi-provider support.
|
|
package service
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/logging"
|
|
"github.com/orchard9/rdev/internal/metrics"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// resolveAgent returns the appropriate CodeAgent for a project.
|
|
// Returns nil if no agent registry is configured or no agent is available.
|
|
func (s *ProjectService) resolveAgent(project *domain.Project) port.CodeAgent {
|
|
if s.agentRegistry == nil {
|
|
return nil
|
|
}
|
|
|
|
// Try project-specific agent first
|
|
if project.AgentProvider != "" {
|
|
if agent := s.agentRegistry.Get(project.AgentProvider); agent != nil {
|
|
return agent
|
|
}
|
|
}
|
|
|
|
// Fall back to default
|
|
return s.agentRegistry.Default()
|
|
}
|
|
|
|
// executeAgentCommand runs a command via CodeAgent and streams output.
|
|
// Uses context.WithoutCancel to preserve tracing/values but allow independent timeout.
|
|
func (s *ProjectService) executeAgentCommand(parentCtx context.Context, agent port.CodeAgent, req *domain.AgentRequest, cmd *domain.Command) {
|
|
// Derive from parent to preserve tracing/values, but with independent cancellation
|
|
ctx, cancel := context.WithTimeout(context.WithoutCancel(parentCtx), 10*time.Minute)
|
|
defer cancel()
|
|
|
|
log := logging.FromContext(ctx).WithService("ProjectService")
|
|
streamID := string(cmd.ID)
|
|
var lastEventID string
|
|
var outputSizeBytes int64
|
|
|
|
// Dispatch command.started webhook event
|
|
s.dispatchWebhookEvent(ctx, string(cmd.ProjectID), domain.WebhookEventCommandStarted, &domain.CommandEventData{
|
|
CommandID: string(cmd.ID),
|
|
CommandType: cmd.Type,
|
|
ProjectID: string(cmd.ProjectID),
|
|
StartedAt: cmd.StartedAt,
|
|
})
|
|
|
|
// Execute via agent with event handler
|
|
result, _ := agent.Execute(ctx, req, func(event domain.AgentEvent) {
|
|
// Convert agent event to stream event
|
|
var eventType string
|
|
var data map[string]any
|
|
|
|
switch event.Type {
|
|
case domain.AgentEventOutput:
|
|
eventType = "output"
|
|
data = map[string]any{
|
|
"line": event.Content,
|
|
"stream": event.Stream,
|
|
}
|
|
case domain.AgentEventToolUse:
|
|
eventType = "tool_use"
|
|
data = map[string]any{
|
|
"tool": event.ToolName,
|
|
"input": event.ToolInput,
|
|
}
|
|
// Record tool use metric
|
|
if event.ToolName != "" {
|
|
metrics.RecordAgentToolUse(string(agent.Provider()), event.ToolName)
|
|
}
|
|
case domain.AgentEventToolResult:
|
|
eventType = "tool_result"
|
|
data = map[string]any{
|
|
"output": event.Content,
|
|
}
|
|
case domain.AgentEventError:
|
|
eventType = "error"
|
|
data = map[string]any{
|
|
"error": event.Content,
|
|
}
|
|
case domain.AgentEventComplete:
|
|
eventType = "agent_complete"
|
|
data = event.Metadata
|
|
default:
|
|
eventType = "output"
|
|
data = map[string]any{
|
|
"line": event.Content,
|
|
"stream": "stdout",
|
|
}
|
|
}
|
|
|
|
eventID := s.streams.Publish(streamID, port.StreamEvent{
|
|
Type: eventType,
|
|
Data: data,
|
|
})
|
|
lastEventID = eventID
|
|
outputSizeBytes += int64(len(event.Content))
|
|
})
|
|
|
|
// Send completion event
|
|
eventID := s.streams.Publish(streamID, port.StreamEvent{
|
|
Type: "complete",
|
|
Data: map[string]any{
|
|
"exit_code": result.ExitCode,
|
|
"duration_ms": result.DurationMs,
|
|
"session_id": result.SessionID,
|
|
"provider": agent.Provider(),
|
|
},
|
|
})
|
|
|
|
// Record metrics
|
|
status := "success"
|
|
if result.ExitCode != 0 {
|
|
status = "error"
|
|
}
|
|
metrics.RecordCommand(string(cmd.ProjectID), string(cmd.Type), status, result.DurationMs)
|
|
|
|
// Record agent-specific metrics
|
|
metrics.RecordAgentRequest(string(agent.Provider()), status, result.DurationMs)
|
|
|
|
// Log audit completion if audit logger is configured
|
|
if s.auditLogger != nil {
|
|
var auditStatus domain.AuditStatus
|
|
var errorMsg string
|
|
if result.Error != nil {
|
|
auditStatus = domain.AuditStatusError
|
|
errorMsg = result.Error.Error()
|
|
} else if result.ExitCode != 0 {
|
|
auditStatus = domain.AuditStatusError
|
|
} else {
|
|
auditStatus = domain.AuditStatusSuccess
|
|
}
|
|
|
|
auditResult := &domain.AuditResult{
|
|
ExitCode: result.ExitCode,
|
|
DurationMs: result.DurationMs,
|
|
Status: auditStatus,
|
|
ErrorMessage: errorMsg,
|
|
OutputSizeBytes: outputSizeBytes,
|
|
}
|
|
if err := s.auditLogger.LogCommandEnd(ctx, string(cmd.ID), auditResult); err != nil {
|
|
log.Warn("failed to log audit end", "command_id", cmd.ID, logging.FieldError, err)
|
|
}
|
|
}
|
|
|
|
// Dispatch command.completed or command.failed webhook event
|
|
completedAt := time.Now()
|
|
var webhookEventType domain.WebhookEventType
|
|
var errorMsg string
|
|
if result.Error != nil {
|
|
webhookEventType = domain.WebhookEventCommandFailed
|
|
errorMsg = result.Error.Error()
|
|
} else if result.ExitCode != 0 {
|
|
webhookEventType = domain.WebhookEventCommandFailed
|
|
} else {
|
|
webhookEventType = domain.WebhookEventCommandCompleted
|
|
}
|
|
|
|
s.dispatchWebhookEvent(ctx, string(cmd.ProjectID), webhookEventType, &domain.CommandEventData{
|
|
CommandID: string(cmd.ID),
|
|
CommandType: cmd.Type,
|
|
ProjectID: string(cmd.ProjectID),
|
|
StartedAt: cmd.StartedAt,
|
|
CompletedAt: completedAt,
|
|
ExitCode: result.ExitCode,
|
|
DurationMs: result.DurationMs,
|
|
Error: errorMsg,
|
|
})
|
|
|
|
log.Debug("agent command completed",
|
|
"command_id", cmd.ID,
|
|
"provider", agent.Provider(),
|
|
"session_id", result.SessionID,
|
|
"exit_code", result.ExitCode,
|
|
logging.FieldDuration, result.DurationMs,
|
|
"last_event_id", lastEventID,
|
|
"complete_event_id", eventID,
|
|
)
|
|
|
|
// Clean up stream after a delay
|
|
go func() {
|
|
time.Sleep(30 * time.Second)
|
|
s.streams.Close(streamID)
|
|
}()
|
|
}
|
|
|
|
// GetAgentCapabilities returns the capabilities for a specific agent provider.
|
|
// Returns nil if no agent registry is configured or the provider is not found.
|
|
func (s *ProjectService) GetAgentCapabilities(provider domain.AgentProvider) *domain.AgentCapabilities {
|
|
if s.agentRegistry == nil {
|
|
return nil
|
|
}
|
|
|
|
agent := s.agentRegistry.Get(provider)
|
|
if agent == nil {
|
|
return nil
|
|
}
|
|
|
|
caps := agent.Capabilities()
|
|
return &caps
|
|
}
|
|
|
|
// ListAvailableAgents returns all available agent providers.
|
|
func (s *ProjectService) ListAvailableAgents() []domain.AgentProvider {
|
|
if s.agentRegistry == nil {
|
|
return nil
|
|
}
|
|
return s.agentRegistry.Available()
|
|
}
|
|
|
|
// GetDefaultAgent returns the default agent provider.
|
|
func (s *ProjectService) GetDefaultAgent() domain.AgentProvider {
|
|
if s.agentRegistry == nil {
|
|
return ""
|
|
}
|
|
agent := s.agentRegistry.Default()
|
|
if agent == nil {
|
|
return ""
|
|
}
|
|
return agent.Provider()
|
|
}
|
|
|
|
// SetDefaultAgent sets the default agent provider.
|
|
func (s *ProjectService) SetDefaultAgent(provider domain.AgentProvider) error {
|
|
if s.agentRegistry == nil {
|
|
return domain.ErrInvalidAgentProvider
|
|
}
|
|
return s.agentRegistry.SetDefault(provider)
|
|
}
|