// Package executor provides kubectl exec functionality for running commands in pods. package executor import ( "bufio" "context" "fmt" "io" "os/exec" "sync" "time" ) // Executor runs commands in Kubernetes pods via kubectl exec. type Executor struct { namespace string mu sync.RWMutex } // New creates a new Executor for the given namespace. func New(namespace string) *Executor { return &Executor{ namespace: namespace, } } // CommandType represents the type of command being executed. type CommandType string const ( CommandTypeClaude CommandType = "claude" CommandTypeShell CommandType = "shell" CommandTypeGit CommandType = "git" ) // Command represents a command to execute in a pod. type Command struct { ID string PodName string Type CommandType Args []string StartedAt time.Time } // Result represents the result of command execution. type Result struct { ExitCode int DurationMs int64 Error error } // OutputHandler is called for each line of output from the command. type OutputHandler func(stream string, line string) // Exec executes a command in the specified pod. // It streams output to the provided handler and returns when complete. func (e *Executor) Exec(ctx context.Context, cmd *Command, handler OutputHandler) Result { e.mu.RLock() namespace := e.namespace e.mu.RUnlock() startTime := time.Now() var args []string switch cmd.Type { case CommandTypeClaude: // claude "prompt" args = []string{ "exec", "-n", namespace, cmd.PodName, "--", "claude", cmd.Args[0], // prompt is first arg } case CommandTypeShell: // bash -c "command" args = []string{ "exec", "-n", namespace, cmd.PodName, "--", "bash", "-c", cmd.Args[0], // command is first arg } case CommandTypeGit: // git args = append([]string{ "exec", "-n", namespace, cmd.PodName, "--", "git", "-C", "/workspace", }, cmd.Args...) default: return Result{ ExitCode: 1, Error: fmt.Errorf("unknown command type: %s", cmd.Type), } } // Create the kubectl command kubectl := exec.CommandContext(ctx, "kubectl", args...) // Get stdout and stderr pipes stdout, err := kubectl.StdoutPipe() if err != nil { return Result{ExitCode: 1, Error: fmt.Errorf("stdout pipe: %w", err)} } stderr, err := kubectl.StderrPipe() if err != nil { return Result{ExitCode: 1, Error: fmt.Errorf("stderr pipe: %w", err)} } // Start the command if err := kubectl.Start(); err != nil { return Result{ExitCode: 1, Error: fmt.Errorf("start: %w", err)} } // Stream output concurrently var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() streamOutput(stdout, "stdout", handler) }() go func() { defer wg.Done() streamOutput(stderr, "stderr", handler) }() // Wait for output to be consumed wg.Wait() // Wait for command to complete err = kubectl.Wait() duration := time.Since(startTime) result := Result{ DurationMs: duration.Milliseconds(), } if err != nil { if exitError, ok := err.(*exec.ExitError); ok { result.ExitCode = exitError.ExitCode() } else { result.ExitCode = 1 result.Error = err } } return result } // streamOutput reads from a reader and sends each line to the handler. func streamOutput(r io.Reader, stream string, handler OutputHandler) { scanner := bufio.NewScanner(r) // Increase buffer size for long lines buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 1024*1024) for scanner.Scan() { handler(stream, scanner.Text()) } } // CheckConnection verifies kubectl can connect to the cluster. func (e *Executor) CheckConnection(ctx context.Context) error { cmd := exec.CommandContext(ctx, "kubectl", "cluster-info", "--request-timeout=5s") return cmd.Run() } // PodExists checks if a pod exists and is running. func (e *Executor) PodExists(ctx context.Context, podName string) (bool, error) { e.mu.RLock() namespace := e.namespace e.mu.RUnlock() cmd := exec.CommandContext(ctx, "kubectl", "get", "pod", podName, "-n", namespace, "-o", "jsonpath={.status.phase}", ) output, err := cmd.Output() if err != nil { // Pod doesn't exist or error return false, nil } return string(output) == "Running", nil }