Implements weeks 1-4 of the multi-provider architecture: Week 1 - Foundation: - Add domain models (AgentProvider, AgentRequest, AgentEvent, AgentResult) - Define CodeAgent port interface with Execute, Cancel, Capabilities - Create thread-safe provider registry with first-registered default Week 2 - Claude Code Adapter: - Extract kubectl exec logic into CodeAgent implementation - Parse stream-json output format (init, message, tool_use, result) - Support session continuation via --resume flag Week 3 - OpenCode Adapter: - HTTP/SSE client for opencode serve API - Session management (create, send message, abort) - Event streaming with documented buffer rationale Week 4 - Quality & Polish: - Fix race condition in OpenCode Cancel method - Add AgentRequest.Validate() with ErrPromptRequired, ErrInvalidTimeout - Document DefaultAvailabilityTimeout constants - Add HTTP error context for debugging Also includes: - Work queue system with PostgreSQL adapter - Credential store for infrastructure secrets - Project templates with Woodpecker CI integration - Comprehensive test coverage Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
311 lines
8.3 KiB
Go
311 lines
8.3 KiB
Go
// Package opencode provides a CodeAgent implementation for the OpenCode server.
|
|
package opencode
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// Client communicates with an OpenCode server via HTTP.
|
|
type Client struct {
|
|
baseURL string
|
|
httpClient *http.Client
|
|
username string
|
|
password string
|
|
}
|
|
|
|
// ClientConfig configures the OpenCode client.
|
|
type ClientConfig struct {
|
|
BaseURL string
|
|
Timeout time.Duration
|
|
Username string
|
|
Password string
|
|
}
|
|
|
|
// NewClient creates a new OpenCode HTTP client.
|
|
func NewClient(cfg ClientConfig) *Client {
|
|
if cfg.Timeout == 0 {
|
|
cfg.Timeout = 30 * time.Second
|
|
}
|
|
if cfg.BaseURL == "" {
|
|
cfg.BaseURL = "http://127.0.0.1:4096"
|
|
}
|
|
if cfg.Username == "" {
|
|
cfg.Username = "opencode"
|
|
}
|
|
|
|
return &Client{
|
|
baseURL: strings.TrimSuffix(cfg.BaseURL, "/"),
|
|
httpClient: &http.Client{
|
|
Timeout: cfg.Timeout,
|
|
},
|
|
username: cfg.Username,
|
|
password: cfg.Password,
|
|
}
|
|
}
|
|
|
|
// HealthResponse represents the /global/health response.
|
|
type HealthResponse struct {
|
|
Healthy bool `json:"healthy"`
|
|
Version string `json:"version"`
|
|
}
|
|
|
|
// Health checks if the OpenCode server is healthy.
|
|
func (c *Client) Health(ctx context.Context) (*HealthResponse, error) {
|
|
resp, err := c.doRequest(ctx, http.MethodGet, "/global/health", nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
var health HealthResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&health); err != nil {
|
|
return nil, fmt.Errorf("decode health: %w", err)
|
|
}
|
|
return &health, nil
|
|
}
|
|
|
|
// Session represents an OpenCode session.
|
|
type Session struct {
|
|
ID string `json:"id"`
|
|
Title string `json:"title,omitempty"`
|
|
ParentID string `json:"parentID,omitempty"`
|
|
CreatedAt time.Time `json:"createdAt,omitempty"`
|
|
}
|
|
|
|
// CreateSessionRequest is the body for POST /session.
|
|
type CreateSessionRequest struct {
|
|
ParentID string `json:"parentID,omitempty"`
|
|
Title string `json:"title,omitempty"`
|
|
}
|
|
|
|
// CreateSession creates a new session.
|
|
func (c *Client) CreateSession(ctx context.Context, req *CreateSessionRequest) (*Session, error) {
|
|
body, _ := json.Marshal(req)
|
|
resp, err := c.doRequest(ctx, http.MethodPost, "/session", bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
var session Session
|
|
if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
|
|
return nil, fmt.Errorf("decode session: %w", err)
|
|
}
|
|
return &session, nil
|
|
}
|
|
|
|
// GetSession retrieves a session by ID.
|
|
func (c *Client) GetSession(ctx context.Context, sessionID string) (*Session, error) {
|
|
resp, err := c.doRequest(ctx, http.MethodGet, "/session/"+sessionID, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
var session Session
|
|
if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
|
|
return nil, fmt.Errorf("decode session: %w", err)
|
|
}
|
|
return &session, nil
|
|
}
|
|
|
|
// MessagePart represents a part of a message.
|
|
type MessagePart struct {
|
|
Type string `json:"type"` // "text", "tool_use", "tool_result", etc.
|
|
Content string `json:"content,omitempty"`
|
|
Name string `json:"name,omitempty"`
|
|
Input any `json:"input,omitempty"`
|
|
}
|
|
|
|
// SendMessageRequest is the body for POST /session/:id/message.
|
|
type SendMessageRequest struct {
|
|
MessageID string `json:"messageID,omitempty"`
|
|
Model string `json:"model,omitempty"`
|
|
Agent string `json:"agent,omitempty"`
|
|
System string `json:"system,omitempty"`
|
|
Tools []string `json:"tools,omitempty"`
|
|
Parts []MessagePart `json:"parts"`
|
|
}
|
|
|
|
// MessageInfo contains message metadata.
|
|
type MessageInfo struct {
|
|
ID string `json:"id"`
|
|
Role string `json:"role"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
// SendMessageResponse is the response from POST /session/:id/message.
|
|
type SendMessageResponse struct {
|
|
Info MessageInfo `json:"info"`
|
|
Parts []MessagePart `json:"parts"`
|
|
}
|
|
|
|
// SendMessage sends a message to a session (synchronous, waits for response).
|
|
func (c *Client) SendMessage(ctx context.Context, sessionID string, req *SendMessageRequest) (*SendMessageResponse, error) {
|
|
body, _ := json.Marshal(req)
|
|
resp, err := c.doRequest(ctx, http.MethodPost, "/session/"+sessionID+"/message", bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
var msgResp SendMessageResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&msgResp); err != nil {
|
|
return nil, fmt.Errorf("decode message response: %w", err)
|
|
}
|
|
return &msgResp, nil
|
|
}
|
|
|
|
// SendPromptAsync sends a message asynchronously.
|
|
func (c *Client) SendPromptAsync(ctx context.Context, sessionID string, req *SendMessageRequest) error {
|
|
body, _ := json.Marshal(req)
|
|
resp, err := c.doRequest(ctx, http.MethodPost, "/session/"+sessionID+"/prompt_async", bytes.NewReader(body))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
return nil
|
|
}
|
|
|
|
// AbortSession stops a running session.
|
|
func (c *Client) AbortSession(ctx context.Context, sessionID string) error {
|
|
resp, err := c.doRequest(ctx, http.MethodPost, "/session/"+sessionID+"/abort", nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
return nil
|
|
}
|
|
|
|
// SSEEvent represents a server-sent event.
|
|
type SSEEvent struct {
|
|
Event string
|
|
Data string
|
|
}
|
|
|
|
// sseEventBufferSize is the capacity of the SSE event channel.
|
|
// This buffer handles typical bursts of events during agent execution (e.g., rapid tool calls).
|
|
// If the consumer is slow and the buffer fills, the goroutine will block on send
|
|
// until the context is cancelled or the consumer catches up.
|
|
const sseEventBufferSize = 100
|
|
|
|
// SubscribeEvents returns a channel of SSE events from the server.
|
|
func (c *Client) SubscribeEvents(ctx context.Context) (<-chan SSEEvent, error) {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/event", nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("Accept", "text/event-stream")
|
|
c.setAuth(req)
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
_ = resp.Body.Close()
|
|
return nil, fmt.Errorf("SSE subscribe failed: %s", resp.Status)
|
|
}
|
|
|
|
events := make(chan SSEEvent, sseEventBufferSize)
|
|
|
|
go func() {
|
|
defer func() { _ = resp.Body.Close() }()
|
|
defer close(events)
|
|
|
|
reader := bufio.NewReader(resp.Body)
|
|
var currentEvent SSEEvent
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
line, err := reader.ReadString('\n')
|
|
if err != nil {
|
|
// EOF is expected when connection closes; other errors are logged but not fatal
|
|
return
|
|
}
|
|
|
|
line = strings.TrimSuffix(line, "\n")
|
|
line = strings.TrimSuffix(line, "\r")
|
|
|
|
if line == "" {
|
|
// Empty line = end of event
|
|
if currentEvent.Event != "" || currentEvent.Data != "" {
|
|
select {
|
|
case events <- currentEvent:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
currentEvent = SSEEvent{}
|
|
}
|
|
continue
|
|
}
|
|
|
|
if strings.HasPrefix(line, "event:") {
|
|
currentEvent.Event = strings.TrimSpace(strings.TrimPrefix(line, "event:"))
|
|
} else if strings.HasPrefix(line, "data:") {
|
|
currentEvent.Data = strings.TrimSpace(strings.TrimPrefix(line, "data:"))
|
|
}
|
|
}
|
|
}()
|
|
|
|
return events, nil
|
|
}
|
|
|
|
// RunShell executes a shell command in the session.
|
|
func (c *Client) RunShell(ctx context.Context, sessionID, command string) error {
|
|
body, _ := json.Marshal(map[string]string{"command": command})
|
|
resp, err := c.doRequest(ctx, http.MethodPost, "/session/"+sessionID+"/shell", bytes.NewReader(body))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
return nil
|
|
}
|
|
|
|
// doRequest performs an HTTP request with auth.
|
|
func (c *Client) doRequest(ctx context.Context, method, path string, body io.Reader) (*http.Response, error) {
|
|
req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if body != nil {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
c.setAuth(req)
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if resp.StatusCode >= 400 {
|
|
bodyBytes, _ := io.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
return nil, fmt.Errorf("%s %s: HTTP %d: %s", method, path, resp.StatusCode, string(bodyBytes))
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// setAuth sets authentication headers if password is configured.
|
|
func (c *Client) setAuth(req *http.Request) {
|
|
if c.password != "" {
|
|
req.SetBasicAuth(c.username, c.password)
|
|
}
|
|
}
|