package kubernetes import ( "context" "fmt" "log/slog" "os/exec" "strings" "sync" "time" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" ) // ProjectRepository implements port.ProjectRepository using Kubernetes. type ProjectRepository struct { namespace string client *kubernetes.Clientset logger *slog.Logger projects map[domain.ProjectID]*domain.Project mu sync.RWMutex // Watch management watchCancel context.CancelFunc watchWg sync.WaitGroup } // NewProjectRepository creates a new Kubernetes project repository. // If client is nil, falls back to hardcoded projects (for local development). func NewProjectRepository(namespace string) *ProjectRepository { return NewProjectRepositoryWithClient(namespace, nil, nil) } // NewProjectRepositoryWithClient creates a new Kubernetes project repository // with an optional K8s client for dynamic project discovery. func NewProjectRepositoryWithClient(namespace string, client *kubernetes.Clientset, logger *slog.Logger) *ProjectRepository { if logger == nil { logger = slog.Default() } r := &ProjectRepository{ namespace: namespace, client: client, logger: logger, projects: make(map[domain.ProjectID]*domain.Project), } // Initialize with fallback hardcoded projects // These will be replaced by discovered projects if K8s client is available r.initFallbackProjects() return r } // initFallbackProjects adds hardcoded projects for when K8s client is unavailable. // Currently empty - projects are discovered dynamically from K8s or stored in the database. func (r *ProjectRepository) initFallbackProjects() { // No hardcoded fallback projects } // Ensure ProjectRepository implements port.ProjectRepository at compile time. var _ port.ProjectRepository = (*ProjectRepository)(nil) // List returns all available projects. func (r *ProjectRepository) List(ctx context.Context) ([]domain.Project, error) { r.mu.RLock() defer r.mu.RUnlock() projects := make([]domain.Project, 0, len(r.projects)) for _, p := range r.projects { projects = append(projects, *p) } return projects, nil } // Get returns a project by ID. func (r *ProjectRepository) Get(ctx context.Context, id domain.ProjectID) (*domain.Project, error) { r.mu.RLock() defer r.mu.RUnlock() p, ok := r.projects[id] if !ok { return nil, domain.ErrProjectNotFound } return p, nil } // Exists checks if a project exists. func (r *ProjectRepository) Exists(ctx context.Context, id domain.ProjectID) (bool, error) { r.mu.RLock() defer r.mu.RUnlock() _, ok := r.projects[id] return ok, nil } // Register adds a new project to the repository. func (r *ProjectRepository) Register(ctx context.Context, project *domain.Project) error { r.mu.Lock() defer r.mu.Unlock() r.projects[project.ID] = project return nil } // Unregister removes a project from the repository. func (r *ProjectRepository) Unregister(ctx context.Context, id domain.ProjectID) error { r.mu.Lock() defer r.mu.Unlock() delete(r.projects, id) return nil } // RefreshStatus updates the status of all projects from K8s. // If K8s client is available, it also discovers new projects. func (r *ProjectRepository) RefreshStatus(ctx context.Context) error { // Try to discover projects from K8s labels first if r.client != nil { if err := r.discoverProjects(ctx); err != nil { r.logger.Warn("failed to discover projects from K8s, using fallback", "error", err) } } r.mu.Lock() defer r.mu.Unlock() for _, p := range r.projects { status, err := r.getPodStatus(ctx, p.PodName) if err != nil { p.Status = domain.ProjectStatusError continue } p.Status = status } return nil } // discoverProjects finds projects from pods with rdev labels. func (r *ProjectRepository) discoverProjects(ctx context.Context) error { if r.client == nil { return fmt.Errorf("k8s client not available") } // List pods with the rdev project label labelSelector := fmt.Sprintf("%s=true", domain.LabelProject) pods, err := r.client.CoreV1().Pods(r.namespace).List(ctx, metav1.ListOptions{ LabelSelector: labelSelector, }) if err != nil { return fmt.Errorf("list pods: %w", err) } r.mu.Lock() defer r.mu.Unlock() // Track which projects we've seen to detect deletions seen := make(map[domain.ProjectID]bool) for _, pod := range pods.Items { project := r.podToProject(&pod) if project != nil { seen[project.ID] = true r.projects[project.ID] = project r.logger.Debug("discovered project from pod", "project_id", project.ID, "pod_name", project.PodName, "status", project.Status) } } // Remove projects whose pods no longer exist (but keep fallback projects if no K8s client) for id := range r.projects { if !seen[id] { // Only remove if we have at least one discovered project // This prevents removing all fallback projects when K8s is unavailable if len(seen) > 0 { r.logger.Info("removing project (pod deleted)", "project_id", id) delete(r.projects, id) } } } return nil } // podToProject converts a K8s pod to a domain.Project. // Returns nil if the pod doesn't have the required labels. func (r *ProjectRepository) podToProject(pod *corev1.Pod) *domain.Project { labels := pod.Labels annotations := pod.Annotations // Check for required labels if labels[domain.LabelProject] != "true" { return nil } name := labels[domain.LabelName] if name == "" { // Fallback to pod name if name label is missing name = pod.Name } workspace := labels[domain.LabelWorkspace] if workspace == "" { workspace = "/workspace" // Default workspace } description := "" if annotations != nil { description = annotations[domain.AnnotDescription] } // Convert pod phase to project status status := r.phaseToStatus(pod.Status.Phase) return &domain.Project{ ID: domain.ProjectID(name), Name: capitalizeFirst(name), Description: description, PodName: pod.Name, Status: status, Workspace: workspace, } } // phaseToStatus converts K8s pod phase to domain.ProjectStatus. func (r *ProjectRepository) phaseToStatus(phase corev1.PodPhase) domain.ProjectStatus { switch phase { case corev1.PodRunning: return domain.ProjectStatusRunning case corev1.PodPending: return domain.ProjectStatusPending case corev1.PodFailed: return domain.ProjectStatusFailed case corev1.PodSucceeded: // Succeeded is a terminal state, treat as not available return domain.ProjectStatusNotFound default: return domain.ProjectStatusUnknown } } // StartWatching begins watching for pod changes in the background. // Call StopWatching to stop the watch. func (r *ProjectRepository) StartWatching(ctx context.Context) error { if r.client == nil { return fmt.Errorf("k8s client not available for watching") } // Create a cancellable context for the watch watchCtx, cancel := context.WithCancel(ctx) r.watchCancel = cancel r.watchWg.Add(1) go r.watchLoop(watchCtx) r.logger.Info("started watching for project pod changes", "namespace", r.namespace) return nil } // StopWatching stops the background pod watch. func (r *ProjectRepository) StopWatching() { if r.watchCancel != nil { r.watchCancel() r.watchWg.Wait() r.watchCancel = nil r.logger.Info("stopped watching for project pod changes") } } // watchLoop continuously watches for pod changes. func (r *ProjectRepository) watchLoop(ctx context.Context) { defer r.watchWg.Done() labelSelector := fmt.Sprintf("%s=true", domain.LabelProject) backoff := time.Second for { select { case <-ctx.Done(): return default: } watcher, err := r.client.CoreV1().Pods(r.namespace).Watch(ctx, metav1.ListOptions{ LabelSelector: labelSelector, }) if err != nil { r.logger.Error("failed to create pod watch", "error", err) time.Sleep(backoff) backoff = min(backoff*2, time.Minute) continue } backoff = time.Second // Reset backoff on successful connection r.handleWatchEvents(ctx, watcher) watcher.Stop() } } // handleWatchEvents processes events from the pod watcher. func (r *ProjectRepository) handleWatchEvents(ctx context.Context, watcher watch.Interface) { for { select { case <-ctx.Done(): return case event, ok := <-watcher.ResultChan(): if !ok { // Watch channel closed, need to reconnect r.logger.Debug("watch channel closed, reconnecting") return } pod, ok := event.Object.(*corev1.Pod) if !ok { continue } switch event.Type { case watch.Added, watch.Modified: project := r.podToProject(pod) if project != nil { r.mu.Lock() existing, exists := r.projects[project.ID] if !exists || existing.Status != project.Status { r.logger.Info("project updated", "event", event.Type, "project_id", project.ID, "status", project.Status) } r.projects[project.ID] = project r.mu.Unlock() } case watch.Deleted: project := r.podToProject(pod) if project != nil { r.mu.Lock() r.logger.Info("project removed", "project_id", project.ID) delete(r.projects, project.ID) r.mu.Unlock() } } } } } // getPodStatus queries the status of a pod using kubectl (fallback method). func (r *ProjectRepository) getPodStatus(ctx context.Context, podName string) (domain.ProjectStatus, error) { // If we have a K8s client, use it directly if r.client != nil { pod, err := r.client.CoreV1().Pods(r.namespace).Get(ctx, podName, metav1.GetOptions{}) if err != nil { if k8serrors.IsNotFound(err) { return domain.ProjectStatusNotFound, nil } return domain.ProjectStatusUnknown, fmt.Errorf("get pod: %w", err) } return r.phaseToStatus(pod.Status.Phase), nil } // Fallback to kubectl for local development cmd := exec.CommandContext(ctx, "kubectl", "get", "pod", podName, "-n", r.namespace, "-o", "jsonpath={.status.phase}", ) output, err := cmd.Output() if err != nil { // Check if pod doesn't exist if strings.Contains(err.Error(), "not found") { return domain.ProjectStatusNotFound, nil } return domain.ProjectStatusUnknown, fmt.Errorf("get pod status: %w", err) } phase := strings.ToLower(strings.TrimSpace(string(output))) switch phase { case "running": return domain.ProjectStatusRunning, nil case "pending": return domain.ProjectStatusPending, nil case "failed": return domain.ProjectStatusFailed, nil default: return domain.ProjectStatusUnknown, nil } } // capitalizeFirst capitalizes the first letter of a string. func capitalizeFirst(s string) string { if s == "" { return s } return strings.ToUpper(s[:1]) + s[1:] }