Implements weeks 1-4 of the multi-provider architecture: Week 1 - Foundation: - Add domain models (AgentProvider, AgentRequest, AgentEvent, AgentResult) - Define CodeAgent port interface with Execute, Cancel, Capabilities - Create thread-safe provider registry with first-registered default Week 2 - Claude Code Adapter: - Extract kubectl exec logic into CodeAgent implementation - Parse stream-json output format (init, message, tool_use, result) - Support session continuation via --resume flag Week 3 - OpenCode Adapter: - HTTP/SSE client for opencode serve API - Session management (create, send message, abort) - Event streaming with documented buffer rationale Week 4 - Quality & Polish: - Fix race condition in OpenCode Cancel method - Add AgentRequest.Validate() with ErrPromptRequired, ErrInvalidTimeout - Document DefaultAvailabilityTimeout constants - Add HTTP error context for debugging Also includes: - Work queue system with PostgreSQL adapter - Credential store for infrastructure secrets - Project templates with Woodpecker CI integration - Comprehensive test coverage Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
89 lines
1.9 KiB
Go
89 lines
1.9 KiB
Go
// Package handlers provides HTTP handlers for the rdev API.
|
|
package handlers
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
)
|
|
|
|
// streamManager manages SSE event streams.
|
|
type streamManager struct {
|
|
mu sync.RWMutex
|
|
streams map[string][]chan streamEvent
|
|
}
|
|
|
|
type streamEvent struct {
|
|
Type string
|
|
Data map[string]any
|
|
}
|
|
|
|
func newStreamManager() *streamManager {
|
|
return &streamManager{
|
|
streams: make(map[string][]chan streamEvent),
|
|
}
|
|
}
|
|
|
|
func (sm *streamManager) Subscribe(streamID string) chan streamEvent {
|
|
sm.mu.Lock()
|
|
defer sm.mu.Unlock()
|
|
|
|
ch := make(chan streamEvent, 100)
|
|
sm.streams[streamID] = append(sm.streams[streamID], ch)
|
|
return ch
|
|
}
|
|
|
|
func (sm *streamManager) Unsubscribe(streamID string, ch chan streamEvent) {
|
|
sm.mu.Lock()
|
|
defer sm.mu.Unlock()
|
|
|
|
channels := sm.streams[streamID]
|
|
for i, c := range channels {
|
|
if c == ch {
|
|
sm.streams[streamID] = append(channels[:i], channels[i+1:]...)
|
|
close(ch)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sm *streamManager) Send(streamID, eventType string, data map[string]any) {
|
|
sm.mu.RLock()
|
|
defer sm.mu.RUnlock()
|
|
|
|
for _, ch := range sm.streams[streamID] {
|
|
select {
|
|
case ch <- streamEvent{Type: eventType, Data: data}:
|
|
default:
|
|
// Channel full, skip
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sm *streamManager) Close(streamID string) {
|
|
sm.mu.Lock()
|
|
defer sm.mu.Unlock()
|
|
|
|
for _, ch := range sm.streams[streamID] {
|
|
close(ch)
|
|
}
|
|
delete(sm.streams, streamID)
|
|
}
|
|
|
|
// writeSSE writes a Server-Sent Event.
|
|
func writeSSE(w http.ResponseWriter, flusher http.Flusher, event string, data map[string]any) {
|
|
writeSSEWithID(w, flusher, "", event, data)
|
|
}
|
|
|
|
// writeSSEWithID writes a Server-Sent Event with an optional event ID.
|
|
func writeSSEWithID(w http.ResponseWriter, flusher http.Flusher, id, event string, data map[string]any) {
|
|
dataBytes, _ := json.Marshal(data)
|
|
if id != "" {
|
|
_, _ = fmt.Fprintf(w, "id: %s\n", id)
|
|
}
|
|
_, _ = fmt.Fprintf(w, "event: %s\n", event)
|
|
_, _ = fmt.Fprintf(w, "data: %s\n\n", dataBytes)
|
|
flusher.Flush()
|
|
}
|