Implements weeks 1-4 of the multi-provider architecture: Week 1 - Foundation: - Add domain models (AgentProvider, AgentRequest, AgentEvent, AgentResult) - Define CodeAgent port interface with Execute, Cancel, Capabilities - Create thread-safe provider registry with first-registered default Week 2 - Claude Code Adapter: - Extract kubectl exec logic into CodeAgent implementation - Parse stream-json output format (init, message, tool_use, result) - Support session continuation via --resume flag Week 3 - OpenCode Adapter: - HTTP/SSE client for opencode serve API - Session management (create, send message, abort) - Event streaming with documented buffer rationale Week 4 - Quality & Polish: - Fix race condition in OpenCode Cancel method - Add AgentRequest.Validate() with ErrPromptRequired, ErrInvalidTimeout - Document DefaultAvailabilityTimeout constants - Add HTTP error context for debugging Also includes: - Work queue system with PostgreSQL adapter - Credential store for infrastructure secrets - Project templates with Woodpecker CI integration - Comprehensive test coverage Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
423 lines
11 KiB
Go
423 lines
11 KiB
Go
package kubernetes
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os/exec"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/client-go/kubernetes"
|
|
)
|
|
|
|
// ProjectRepository implements port.ProjectRepository using Kubernetes.
|
|
type ProjectRepository struct {
|
|
namespace string
|
|
client *kubernetes.Clientset
|
|
logger *slog.Logger
|
|
|
|
projects map[domain.ProjectID]*domain.Project
|
|
mu sync.RWMutex
|
|
|
|
// Watch management
|
|
watchCancel context.CancelFunc
|
|
watchWg sync.WaitGroup
|
|
}
|
|
|
|
// NewProjectRepository creates a new Kubernetes project repository.
|
|
// If client is nil, falls back to hardcoded projects (for local development).
|
|
func NewProjectRepository(namespace string) *ProjectRepository {
|
|
return NewProjectRepositoryWithClient(namespace, nil, nil)
|
|
}
|
|
|
|
// NewProjectRepositoryWithClient creates a new Kubernetes project repository
|
|
// with an optional K8s client for dynamic project discovery.
|
|
func NewProjectRepositoryWithClient(namespace string, client *kubernetes.Clientset, logger *slog.Logger) *ProjectRepository {
|
|
if logger == nil {
|
|
logger = slog.Default()
|
|
}
|
|
|
|
r := &ProjectRepository{
|
|
namespace: namespace,
|
|
client: client,
|
|
logger: logger,
|
|
projects: make(map[domain.ProjectID]*domain.Project),
|
|
}
|
|
|
|
// Initialize with fallback hardcoded projects
|
|
// These will be replaced by discovered projects if K8s client is available
|
|
r.initFallbackProjects()
|
|
|
|
return r
|
|
}
|
|
|
|
// initFallbackProjects adds hardcoded projects for when K8s client is unavailable.
|
|
func (r *ProjectRepository) initFallbackProjects() {
|
|
r.projects["pantheon"] = &domain.Project{
|
|
ID: "pantheon",
|
|
Name: "Pantheon",
|
|
Description: "Go API backend",
|
|
PodName: "claudebox-pantheon-0",
|
|
Status: domain.ProjectStatusUnknown,
|
|
Workspace: "/workspace",
|
|
}
|
|
r.projects["aeries"] = &domain.Project{
|
|
ID: "aeries",
|
|
Name: "Aeries",
|
|
Description: "Note community platform",
|
|
PodName: "claudebox-aeries-0",
|
|
Status: domain.ProjectStatusUnknown,
|
|
Workspace: "/workspace",
|
|
}
|
|
}
|
|
|
|
// Ensure ProjectRepository implements port.ProjectRepository at compile time.
|
|
var _ port.ProjectRepository = (*ProjectRepository)(nil)
|
|
|
|
// List returns all available projects.
|
|
func (r *ProjectRepository) List(ctx context.Context) ([]domain.Project, error) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
projects := make([]domain.Project, 0, len(r.projects))
|
|
for _, p := range r.projects {
|
|
projects = append(projects, *p)
|
|
}
|
|
return projects, nil
|
|
}
|
|
|
|
// Get returns a project by ID.
|
|
func (r *ProjectRepository) Get(ctx context.Context, id domain.ProjectID) (*domain.Project, error) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
p, ok := r.projects[id]
|
|
if !ok {
|
|
return nil, domain.ErrProjectNotFound
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
// Exists checks if a project exists.
|
|
func (r *ProjectRepository) Exists(ctx context.Context, id domain.ProjectID) (bool, error) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
_, ok := r.projects[id]
|
|
return ok, nil
|
|
}
|
|
|
|
// Register adds a new project to the repository.
|
|
func (r *ProjectRepository) Register(ctx context.Context, project *domain.Project) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
r.projects[project.ID] = project
|
|
return nil
|
|
}
|
|
|
|
// Unregister removes a project from the repository.
|
|
func (r *ProjectRepository) Unregister(ctx context.Context, id domain.ProjectID) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
delete(r.projects, id)
|
|
return nil
|
|
}
|
|
|
|
// RefreshStatus updates the status of all projects from K8s.
|
|
// If K8s client is available, it also discovers new projects.
|
|
func (r *ProjectRepository) RefreshStatus(ctx context.Context) error {
|
|
// Try to discover projects from K8s labels first
|
|
if r.client != nil {
|
|
if err := r.discoverProjects(ctx); err != nil {
|
|
r.logger.Warn("failed to discover projects from K8s, using fallback", "error", err)
|
|
}
|
|
}
|
|
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
for _, p := range r.projects {
|
|
status, err := r.getPodStatus(ctx, p.PodName)
|
|
if err != nil {
|
|
p.Status = domain.ProjectStatusError
|
|
continue
|
|
}
|
|
p.Status = status
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// discoverProjects finds projects from pods with rdev labels.
|
|
func (r *ProjectRepository) discoverProjects(ctx context.Context) error {
|
|
if r.client == nil {
|
|
return fmt.Errorf("k8s client not available")
|
|
}
|
|
|
|
// List pods with the rdev project label
|
|
labelSelector := fmt.Sprintf("%s=true", domain.LabelProject)
|
|
|
|
pods, err := r.client.CoreV1().Pods(r.namespace).List(ctx, metav1.ListOptions{
|
|
LabelSelector: labelSelector,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("list pods: %w", err)
|
|
}
|
|
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
// Track which projects we've seen to detect deletions
|
|
seen := make(map[domain.ProjectID]bool)
|
|
|
|
for _, pod := range pods.Items {
|
|
project := r.podToProject(&pod)
|
|
if project != nil {
|
|
seen[project.ID] = true
|
|
r.projects[project.ID] = project
|
|
r.logger.Debug("discovered project from pod",
|
|
"project_id", project.ID,
|
|
"pod_name", project.PodName,
|
|
"status", project.Status)
|
|
}
|
|
}
|
|
|
|
// Remove projects whose pods no longer exist (but keep fallback projects if no K8s client)
|
|
for id := range r.projects {
|
|
if !seen[id] {
|
|
// Only remove if we have at least one discovered project
|
|
// This prevents removing all fallback projects when K8s is unavailable
|
|
if len(seen) > 0 {
|
|
r.logger.Info("removing project (pod deleted)", "project_id", id)
|
|
delete(r.projects, id)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// podToProject converts a K8s pod to a domain.Project.
|
|
// Returns nil if the pod doesn't have the required labels.
|
|
func (r *ProjectRepository) podToProject(pod *corev1.Pod) *domain.Project {
|
|
labels := pod.Labels
|
|
annotations := pod.Annotations
|
|
|
|
// Check for required labels
|
|
if labels[domain.LabelProject] != "true" {
|
|
return nil
|
|
}
|
|
|
|
name := labels[domain.LabelName]
|
|
if name == "" {
|
|
// Fallback to pod name if name label is missing
|
|
name = pod.Name
|
|
}
|
|
|
|
workspace := labels[domain.LabelWorkspace]
|
|
if workspace == "" {
|
|
workspace = "/workspace" // Default workspace
|
|
}
|
|
|
|
description := ""
|
|
if annotations != nil {
|
|
description = annotations[domain.AnnotDescription]
|
|
}
|
|
|
|
// Convert pod phase to project status
|
|
status := r.phaseToStatus(pod.Status.Phase)
|
|
|
|
return &domain.Project{
|
|
ID: domain.ProjectID(name),
|
|
Name: capitalizeFirst(name),
|
|
Description: description,
|
|
PodName: pod.Name,
|
|
Status: status,
|
|
Workspace: workspace,
|
|
}
|
|
}
|
|
|
|
// phaseToStatus converts K8s pod phase to domain.ProjectStatus.
|
|
func (r *ProjectRepository) phaseToStatus(phase corev1.PodPhase) domain.ProjectStatus {
|
|
switch phase {
|
|
case corev1.PodRunning:
|
|
return domain.ProjectStatusRunning
|
|
case corev1.PodPending:
|
|
return domain.ProjectStatusPending
|
|
case corev1.PodFailed:
|
|
return domain.ProjectStatusFailed
|
|
case corev1.PodSucceeded:
|
|
// Succeeded is a terminal state, treat as not available
|
|
return domain.ProjectStatusNotFound
|
|
default:
|
|
return domain.ProjectStatusUnknown
|
|
}
|
|
}
|
|
|
|
// StartWatching begins watching for pod changes in the background.
|
|
// Call StopWatching to stop the watch.
|
|
func (r *ProjectRepository) StartWatching(ctx context.Context) error {
|
|
if r.client == nil {
|
|
return fmt.Errorf("k8s client not available for watching")
|
|
}
|
|
|
|
// Create a cancellable context for the watch
|
|
watchCtx, cancel := context.WithCancel(ctx)
|
|
r.watchCancel = cancel
|
|
|
|
r.watchWg.Add(1)
|
|
go r.watchLoop(watchCtx)
|
|
|
|
r.logger.Info("started watching for project pod changes", "namespace", r.namespace)
|
|
return nil
|
|
}
|
|
|
|
// StopWatching stops the background pod watch.
|
|
func (r *ProjectRepository) StopWatching() {
|
|
if r.watchCancel != nil {
|
|
r.watchCancel()
|
|
r.watchWg.Wait()
|
|
r.watchCancel = nil
|
|
r.logger.Info("stopped watching for project pod changes")
|
|
}
|
|
}
|
|
|
|
// watchLoop continuously watches for pod changes.
|
|
func (r *ProjectRepository) watchLoop(ctx context.Context) {
|
|
defer r.watchWg.Done()
|
|
|
|
labelSelector := fmt.Sprintf("%s=true", domain.LabelProject)
|
|
backoff := time.Second
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
watcher, err := r.client.CoreV1().Pods(r.namespace).Watch(ctx, metav1.ListOptions{
|
|
LabelSelector: labelSelector,
|
|
})
|
|
if err != nil {
|
|
r.logger.Error("failed to create pod watch", "error", err)
|
|
time.Sleep(backoff)
|
|
backoff = min(backoff*2, time.Minute)
|
|
continue
|
|
}
|
|
|
|
backoff = time.Second // Reset backoff on successful connection
|
|
|
|
r.handleWatchEvents(ctx, watcher)
|
|
watcher.Stop()
|
|
}
|
|
}
|
|
|
|
// handleWatchEvents processes events from the pod watcher.
|
|
func (r *ProjectRepository) handleWatchEvents(ctx context.Context, watcher watch.Interface) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case event, ok := <-watcher.ResultChan():
|
|
if !ok {
|
|
// Watch channel closed, need to reconnect
|
|
r.logger.Debug("watch channel closed, reconnecting")
|
|
return
|
|
}
|
|
|
|
pod, ok := event.Object.(*corev1.Pod)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
switch event.Type {
|
|
case watch.Added, watch.Modified:
|
|
project := r.podToProject(pod)
|
|
if project != nil {
|
|
r.mu.Lock()
|
|
existing, exists := r.projects[project.ID]
|
|
if !exists || existing.Status != project.Status {
|
|
r.logger.Info("project updated",
|
|
"event", event.Type,
|
|
"project_id", project.ID,
|
|
"status", project.Status)
|
|
}
|
|
r.projects[project.ID] = project
|
|
r.mu.Unlock()
|
|
}
|
|
|
|
case watch.Deleted:
|
|
project := r.podToProject(pod)
|
|
if project != nil {
|
|
r.mu.Lock()
|
|
r.logger.Info("project removed", "project_id", project.ID)
|
|
delete(r.projects, project.ID)
|
|
r.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// getPodStatus queries the status of a pod using kubectl (fallback method).
|
|
func (r *ProjectRepository) getPodStatus(ctx context.Context, podName string) (domain.ProjectStatus, error) {
|
|
// If we have a K8s client, use it directly
|
|
if r.client != nil {
|
|
pod, err := r.client.CoreV1().Pods(r.namespace).Get(ctx, podName, metav1.GetOptions{})
|
|
if err != nil {
|
|
if k8serrors.IsNotFound(err) {
|
|
return domain.ProjectStatusNotFound, nil
|
|
}
|
|
return domain.ProjectStatusUnknown, fmt.Errorf("get pod: %w", err)
|
|
}
|
|
return r.phaseToStatus(pod.Status.Phase), nil
|
|
}
|
|
|
|
// Fallback to kubectl for local development
|
|
cmd := exec.CommandContext(ctx, "kubectl",
|
|
"get", "pod", podName,
|
|
"-n", r.namespace,
|
|
"-o", "jsonpath={.status.phase}",
|
|
)
|
|
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
// Check if pod doesn't exist
|
|
if strings.Contains(err.Error(), "not found") {
|
|
return domain.ProjectStatusNotFound, nil
|
|
}
|
|
return domain.ProjectStatusUnknown, fmt.Errorf("get pod status: %w", err)
|
|
}
|
|
|
|
phase := strings.ToLower(strings.TrimSpace(string(output)))
|
|
switch phase {
|
|
case "running":
|
|
return domain.ProjectStatusRunning, nil
|
|
case "pending":
|
|
return domain.ProjectStatusPending, nil
|
|
case "failed":
|
|
return domain.ProjectStatusFailed, nil
|
|
default:
|
|
return domain.ProjectStatusUnknown, nil
|
|
}
|
|
}
|
|
|
|
// capitalizeFirst capitalizes the first letter of a string.
|
|
func capitalizeFirst(s string) string {
|
|
if s == "" {
|
|
return s
|
|
}
|
|
return strings.ToUpper(s[:1]) + s[1:]
|
|
}
|