Implements weeks 1-4 of the multi-provider architecture: Week 1 - Foundation: - Add domain models (AgentProvider, AgentRequest, AgentEvent, AgentResult) - Define CodeAgent port interface with Execute, Cancel, Capabilities - Create thread-safe provider registry with first-registered default Week 2 - Claude Code Adapter: - Extract kubectl exec logic into CodeAgent implementation - Parse stream-json output format (init, message, tool_use, result) - Support session continuation via --resume flag Week 3 - OpenCode Adapter: - HTTP/SSE client for opencode serve API - Session management (create, send message, abort) - Event streaming with documented buffer rationale Week 4 - Quality & Polish: - Fix race condition in OpenCode Cancel method - Add AgentRequest.Validate() with ErrPromptRequired, ErrInvalidTimeout - Document DefaultAvailabilityTimeout constants - Add HTTP error context for debugging Also includes: - Work queue system with PostgreSQL adapter - Credential store for infrastructure secrets - Project templates with Woodpecker CI integration - Comprehensive test coverage Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
109 lines
3.2 KiB
Go
109 lines
3.2 KiB
Go
// Package service provides business logic / use cases for the application.
|
|
// This file contains command queue functionality for async task execution.
|
|
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
)
|
|
|
|
// EnqueueCommandRequest contains parameters for enqueueing a command.
|
|
type EnqueueCommandRequest struct {
|
|
ProjectID domain.ProjectID
|
|
Command string
|
|
CommandType domain.CommandType
|
|
WorkingDir string
|
|
Priority int
|
|
Audit *AuditContext
|
|
}
|
|
|
|
// EnqueueCommandResult contains the result of enqueueing a command.
|
|
type EnqueueCommandResult struct {
|
|
CommandID domain.QueuedCommandID
|
|
StreamURL string
|
|
Position int
|
|
}
|
|
|
|
// EnqueueCommand adds a command to the project's queue for async execution.
|
|
// Returns an error if no queue is configured.
|
|
func (s *ProjectService) EnqueueCommand(ctx context.Context, req EnqueueCommandRequest) (*EnqueueCommandResult, error) {
|
|
if s.queue == nil {
|
|
return nil, fmt.Errorf("command queue not configured")
|
|
}
|
|
|
|
// Validate project exists
|
|
exists, err := s.projects.Exists(ctx, req.ProjectID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !exists {
|
|
return nil, domain.ErrProjectNotFound
|
|
}
|
|
|
|
// Create queued command
|
|
cmd := &domain.QueuedCommand{
|
|
ProjectID: string(req.ProjectID),
|
|
Command: req.Command,
|
|
CommandType: req.CommandType,
|
|
WorkingDir: req.WorkingDir,
|
|
Status: domain.QueueStatusPending,
|
|
Priority: req.Priority,
|
|
}
|
|
if req.Audit != nil {
|
|
cmd.APIKeyID = req.Audit.APIKeyID
|
|
}
|
|
|
|
// Enqueue
|
|
if err := s.queue.Enqueue(ctx, cmd); err != nil {
|
|
return nil, fmt.Errorf("enqueue command: %w", err)
|
|
}
|
|
|
|
// Get approximate position
|
|
pendingStatus := domain.QueueStatusPending
|
|
pending, _ := s.queue.List(ctx, string(req.ProjectID), &domain.QueueFilters{
|
|
Status: &pendingStatus,
|
|
Limit: 1000,
|
|
SortOrder: "asc",
|
|
})
|
|
|
|
return &EnqueueCommandResult{
|
|
CommandID: cmd.ID,
|
|
StreamURL: fmt.Sprintf("/projects/%s/events?stream_id=%s", req.ProjectID, cmd.ID),
|
|
Position: len(pending),
|
|
}, nil
|
|
}
|
|
|
|
// GetQueuedCommand retrieves a queued command by ID.
|
|
func (s *ProjectService) GetQueuedCommand(ctx context.Context, cmdID domain.QueuedCommandID) (*domain.QueuedCommand, error) {
|
|
if s.queue == nil {
|
|
return nil, fmt.Errorf("command queue not configured")
|
|
}
|
|
return s.queue.GetByID(ctx, cmdID)
|
|
}
|
|
|
|
// ListQueuedCommands returns queued commands for a project.
|
|
func (s *ProjectService) ListQueuedCommands(ctx context.Context, projectID domain.ProjectID, filters *domain.QueueFilters) ([]*domain.QueuedCommand, error) {
|
|
if s.queue == nil {
|
|
return nil, fmt.Errorf("command queue not configured")
|
|
}
|
|
return s.queue.List(ctx, string(projectID), filters)
|
|
}
|
|
|
|
// CancelQueuedCommand cancels a pending queued command.
|
|
func (s *ProjectService) CancelQueuedCommand(ctx context.Context, cmdID domain.QueuedCommandID) error {
|
|
if s.queue == nil {
|
|
return fmt.Errorf("command queue not configured")
|
|
}
|
|
return s.queue.Cancel(ctx, cmdID)
|
|
}
|
|
|
|
// GetQueueStats returns queue statistics for a project.
|
|
func (s *ProjectService) GetQueueStats(ctx context.Context, projectID domain.ProjectID) (*domain.QueueStats, error) {
|
|
if s.queue == nil {
|
|
return nil, fmt.Errorf("command queue not configured")
|
|
}
|
|
return s.queue.GetStats(ctx, string(projectID))
|
|
}
|