rdev/internal/executor/executor.go
jordan 0960b17eb2 feat: Implement v0.2-v0.4 (workspaces, git, API)
v0.2 - Real Workspaces:
- Project-specific claudebox StatefulSets (pantheon, aeries)
- Init containers for git clone via SSH
- Deploy key secrets template
- Project ConfigMaps for CLAUDE.md

v0.3 - Git Integration:
- Dockerfile with rdev-bot git identity
- openssh-client for SSH operations
- Image version bump to v0.3.0

v0.4 - API Server:
- Go REST API with chi router
- Endpoints: /projects, /claude, /shell, /git, /events
- SSE streaming for real-time output
- OpenAPI docs via Scalar at /docs
- Kubernetes RBAC for pod exec
- Executor and project registry packages

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 21:07:00 -07:00

184 lines
4.1 KiB
Go

// Package executor provides kubectl exec functionality for running commands in pods.
package executor
import (
"bufio"
"context"
"fmt"
"io"
"os/exec"
"sync"
"time"
)
// Executor runs commands in Kubernetes pods via kubectl exec.
type Executor struct {
namespace string
mu sync.RWMutex
}
// New creates a new Executor for the given namespace.
func New(namespace string) *Executor {
return &Executor{
namespace: namespace,
}
}
// CommandType represents the type of command being executed.
type CommandType string
const (
CommandTypeClaude CommandType = "claude"
CommandTypeShell CommandType = "shell"
CommandTypeGit CommandType = "git"
)
// Command represents a command to execute in a pod.
type Command struct {
ID string
PodName string
Type CommandType
Args []string
StartedAt time.Time
}
// Result represents the result of command execution.
type Result struct {
ExitCode int
DurationMs int64
Error error
}
// OutputHandler is called for each line of output from the command.
type OutputHandler func(stream string, line string)
// Exec executes a command in the specified pod.
// It streams output to the provided handler and returns when complete.
func (e *Executor) Exec(ctx context.Context, cmd *Command, handler OutputHandler) Result {
e.mu.RLock()
namespace := e.namespace
e.mu.RUnlock()
startTime := time.Now()
var args []string
switch cmd.Type {
case CommandTypeClaude:
// claude "prompt"
args = []string{
"exec", "-n", namespace, cmd.PodName, "--",
"claude", cmd.Args[0], // prompt is first arg
}
case CommandTypeShell:
// bash -c "command"
args = []string{
"exec", "-n", namespace, cmd.PodName, "--",
"bash", "-c", cmd.Args[0], // command is first arg
}
case CommandTypeGit:
// git <args...>
args = append([]string{
"exec", "-n", namespace, cmd.PodName, "--",
"git", "-C", "/workspace",
}, cmd.Args...)
default:
return Result{
ExitCode: 1,
Error: fmt.Errorf("unknown command type: %s", cmd.Type),
}
}
// Create the kubectl command
kubectl := exec.CommandContext(ctx, "kubectl", args...)
// Get stdout and stderr pipes
stdout, err := kubectl.StdoutPipe()
if err != nil {
return Result{ExitCode: 1, Error: fmt.Errorf("stdout pipe: %w", err)}
}
stderr, err := kubectl.StderrPipe()
if err != nil {
return Result{ExitCode: 1, Error: fmt.Errorf("stderr pipe: %w", err)}
}
// Start the command
if err := kubectl.Start(); err != nil {
return Result{ExitCode: 1, Error: fmt.Errorf("start: %w", err)}
}
// Stream output concurrently
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
streamOutput(stdout, "stdout", handler)
}()
go func() {
defer wg.Done()
streamOutput(stderr, "stderr", handler)
}()
// Wait for output to be consumed
wg.Wait()
// Wait for command to complete
err = kubectl.Wait()
duration := time.Since(startTime)
result := Result{
DurationMs: duration.Milliseconds(),
}
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
result.ExitCode = exitError.ExitCode()
} else {
result.ExitCode = 1
result.Error = err
}
}
return result
}
// streamOutput reads from a reader and sends each line to the handler.
func streamOutput(r io.Reader, stream string, handler OutputHandler) {
scanner := bufio.NewScanner(r)
// Increase buffer size for long lines
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
handler(stream, scanner.Text())
}
}
// CheckConnection verifies kubectl can connect to the cluster.
func (e *Executor) CheckConnection(ctx context.Context) error {
cmd := exec.CommandContext(ctx, "kubectl", "cluster-info", "--request-timeout=5s")
return cmd.Run()
}
// PodExists checks if a pod exists and is running.
func (e *Executor) PodExists(ctx context.Context, podName string) (bool, error) {
e.mu.RLock()
namespace := e.namespace
e.mu.RUnlock()
cmd := exec.CommandContext(ctx, "kubectl",
"get", "pod", podName,
"-n", namespace,
"-o", "jsonpath={.status.phase}",
)
output, err := cmd.Output()
if err != nil {
// Pod doesn't exist or error
return false, nil
}
return string(output) == "Running", nil
}