rdev/internal/adapter/codeagent/opencode/client.go
jordan 39df51defd feat: Add multi-provider code agent interface with Claude Code and OpenCode adapters
Implements weeks 1-4 of the multi-provider architecture:

Week 1 - Foundation:
- Add domain models (AgentProvider, AgentRequest, AgentEvent, AgentResult)
- Define CodeAgent port interface with Execute, Cancel, Capabilities
- Create thread-safe provider registry with first-registered default

Week 2 - Claude Code Adapter:
- Extract kubectl exec logic into CodeAgent implementation
- Parse stream-json output format (init, message, tool_use, result)
- Support session continuation via --resume flag

Week 3 - OpenCode Adapter:
- HTTP/SSE client for opencode serve API
- Session management (create, send message, abort)
- Event streaming with documented buffer rationale

Week 4 - Quality & Polish:
- Fix race condition in OpenCode Cancel method
- Add AgentRequest.Validate() with ErrPromptRequired, ErrInvalidTimeout
- Document DefaultAvailabilityTimeout constants
- Add HTTP error context for debugging

Also includes:
- Work queue system with PostgreSQL adapter
- Credential store for infrastructure secrets
- Project templates with Woodpecker CI integration
- Comprehensive test coverage

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-27 09:25:51 -07:00

311 lines
8.3 KiB
Go

// Package opencode provides a CodeAgent implementation for the OpenCode server.
package opencode
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)
// Client communicates with an OpenCode server via HTTP.
type Client struct {
baseURL string
httpClient *http.Client
username string
password string
}
// ClientConfig configures the OpenCode client.
type ClientConfig struct {
BaseURL string
Timeout time.Duration
Username string
Password string
}
// NewClient creates a new OpenCode HTTP client.
func NewClient(cfg ClientConfig) *Client {
if cfg.Timeout == 0 {
cfg.Timeout = 30 * time.Second
}
if cfg.BaseURL == "" {
cfg.BaseURL = "http://127.0.0.1:4096"
}
if cfg.Username == "" {
cfg.Username = "opencode"
}
return &Client{
baseURL: strings.TrimSuffix(cfg.BaseURL, "/"),
httpClient: &http.Client{
Timeout: cfg.Timeout,
},
username: cfg.Username,
password: cfg.Password,
}
}
// HealthResponse represents the /global/health response.
type HealthResponse struct {
Healthy bool `json:"healthy"`
Version string `json:"version"`
}
// Health checks if the OpenCode server is healthy.
func (c *Client) Health(ctx context.Context) (*HealthResponse, error) {
resp, err := c.doRequest(ctx, http.MethodGet, "/global/health", nil)
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
var health HealthResponse
if err := json.NewDecoder(resp.Body).Decode(&health); err != nil {
return nil, fmt.Errorf("decode health: %w", err)
}
return &health, nil
}
// Session represents an OpenCode session.
type Session struct {
ID string `json:"id"`
Title string `json:"title,omitempty"`
ParentID string `json:"parentID,omitempty"`
CreatedAt time.Time `json:"createdAt,omitempty"`
}
// CreateSessionRequest is the body for POST /session.
type CreateSessionRequest struct {
ParentID string `json:"parentID,omitempty"`
Title string `json:"title,omitempty"`
}
// CreateSession creates a new session.
func (c *Client) CreateSession(ctx context.Context, req *CreateSessionRequest) (*Session, error) {
body, _ := json.Marshal(req)
resp, err := c.doRequest(ctx, http.MethodPost, "/session", bytes.NewReader(body))
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
var session Session
if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
return nil, fmt.Errorf("decode session: %w", err)
}
return &session, nil
}
// GetSession retrieves a session by ID.
func (c *Client) GetSession(ctx context.Context, sessionID string) (*Session, error) {
resp, err := c.doRequest(ctx, http.MethodGet, "/session/"+sessionID, nil)
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
var session Session
if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
return nil, fmt.Errorf("decode session: %w", err)
}
return &session, nil
}
// MessagePart represents a part of a message.
type MessagePart struct {
Type string `json:"type"` // "text", "tool_use", "tool_result", etc.
Content string `json:"content,omitempty"`
Name string `json:"name,omitempty"`
Input any `json:"input,omitempty"`
}
// SendMessageRequest is the body for POST /session/:id/message.
type SendMessageRequest struct {
MessageID string `json:"messageID,omitempty"`
Model string `json:"model,omitempty"`
Agent string `json:"agent,omitempty"`
System string `json:"system,omitempty"`
Tools []string `json:"tools,omitempty"`
Parts []MessagePart `json:"parts"`
}
// MessageInfo contains message metadata.
type MessageInfo struct {
ID string `json:"id"`
Role string `json:"role"`
Timestamp time.Time `json:"timestamp"`
}
// SendMessageResponse is the response from POST /session/:id/message.
type SendMessageResponse struct {
Info MessageInfo `json:"info"`
Parts []MessagePart `json:"parts"`
}
// SendMessage sends a message to a session (synchronous, waits for response).
func (c *Client) SendMessage(ctx context.Context, sessionID string, req *SendMessageRequest) (*SendMessageResponse, error) {
body, _ := json.Marshal(req)
resp, err := c.doRequest(ctx, http.MethodPost, "/session/"+sessionID+"/message", bytes.NewReader(body))
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
var msgResp SendMessageResponse
if err := json.NewDecoder(resp.Body).Decode(&msgResp); err != nil {
return nil, fmt.Errorf("decode message response: %w", err)
}
return &msgResp, nil
}
// SendPromptAsync sends a message asynchronously.
func (c *Client) SendPromptAsync(ctx context.Context, sessionID string, req *SendMessageRequest) error {
body, _ := json.Marshal(req)
resp, err := c.doRequest(ctx, http.MethodPost, "/session/"+sessionID+"/prompt_async", bytes.NewReader(body))
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
return nil
}
// AbortSession stops a running session.
func (c *Client) AbortSession(ctx context.Context, sessionID string) error {
resp, err := c.doRequest(ctx, http.MethodPost, "/session/"+sessionID+"/abort", nil)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
return nil
}
// SSEEvent represents a server-sent event.
type SSEEvent struct {
Event string
Data string
}
// sseEventBufferSize is the capacity of the SSE event channel.
// This buffer handles typical bursts of events during agent execution (e.g., rapid tool calls).
// If the consumer is slow and the buffer fills, the goroutine will block on send
// until the context is cancelled or the consumer catches up.
const sseEventBufferSize = 100
// SubscribeEvents returns a channel of SSE events from the server.
func (c *Client) SubscribeEvents(ctx context.Context) (<-chan SSEEvent, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/event", nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "text/event-stream")
c.setAuth(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
_ = resp.Body.Close()
return nil, fmt.Errorf("SSE subscribe failed: %s", resp.Status)
}
events := make(chan SSEEvent, sseEventBufferSize)
go func() {
defer func() { _ = resp.Body.Close() }()
defer close(events)
reader := bufio.NewReader(resp.Body)
var currentEvent SSEEvent
for {
select {
case <-ctx.Done():
return
default:
}
line, err := reader.ReadString('\n')
if err != nil {
// EOF is expected when connection closes; other errors are logged but not fatal
return
}
line = strings.TrimSuffix(line, "\n")
line = strings.TrimSuffix(line, "\r")
if line == "" {
// Empty line = end of event
if currentEvent.Event != "" || currentEvent.Data != "" {
select {
case events <- currentEvent:
case <-ctx.Done():
return
}
currentEvent = SSEEvent{}
}
continue
}
if strings.HasPrefix(line, "event:") {
currentEvent.Event = strings.TrimSpace(strings.TrimPrefix(line, "event:"))
} else if strings.HasPrefix(line, "data:") {
currentEvent.Data = strings.TrimSpace(strings.TrimPrefix(line, "data:"))
}
}
}()
return events, nil
}
// RunShell executes a shell command in the session.
func (c *Client) RunShell(ctx context.Context, sessionID, command string) error {
body, _ := json.Marshal(map[string]string{"command": command})
resp, err := c.doRequest(ctx, http.MethodPost, "/session/"+sessionID+"/shell", bytes.NewReader(body))
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
return nil
}
// doRequest performs an HTTP request with auth.
func (c *Client) doRequest(ctx context.Context, method, path string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, body)
if err != nil {
return nil, err
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
c.setAuth(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode >= 400 {
bodyBytes, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
return nil, fmt.Errorf("%s %s: HTTP %d: %s", method, path, resp.StatusCode, string(bodyBytes))
}
return resp, nil
}
// setAuth sets authentication headers if password is configured.
func (c *Client) setAuth(req *http.Request) {
if c.password != "" {
req.SetBasicAuth(c.username, c.password)
}
}