Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
- Add UndeployAll() using label selectors to clean up monorepo components on project deletion (replaces name-based Undeploy in DeleteProject and the direct undeploy handler) - Add ResourceGC background worker that periodically finds K8s resources whose project label has no matching DB record, deletes after 1h safety window - Widen deployer client type from *kubernetes.Clientset to kubernetes.Interface for testability - UndeployAll accumulates errors via errors.Join instead of failing fast - Add checkout/checkin sidecar dev flow: temporary git tokens, branch checkout, review on checkin with cleanup workers - Add interactive sessions: pod binding, command execution, SSE streaming, ephemeral preview URLs with session cleanup workers - Add GET /workers/pool endpoint for aggregate capacity and queue depth - Add sessions:read and sessions:execute auth scopes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
405 lines
12 KiB
Go
405 lines
12 KiB
Go
// Package deployer provides a Kubernetes deployment adapter implementing port.Deployer.
|
|
package deployer
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
k8serr "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// Ensure Deployer implements port.Deployer.
|
|
var _ port.Deployer = (*Deployer)(nil)
|
|
|
|
// Config holds configuration for the Deployer.
|
|
type Config struct {
|
|
// Namespace is the K8s namespace for project deployments.
|
|
Namespace string
|
|
// DefaultReplicas is the default number of replicas if not specified.
|
|
DefaultReplicas int
|
|
// IngressClass is the ingress controller class (e.g., "traefik").
|
|
IngressClass string
|
|
// TLSIssuer is the cert-manager issuer name.
|
|
TLSIssuer string
|
|
// DefaultDomain is the base domain for auto-generated URLs.
|
|
DefaultDomain string
|
|
}
|
|
|
|
// Deployer manages Kubernetes deployments for projects.
|
|
type Deployer struct {
|
|
client kubernetes.Interface
|
|
ingressClient IngressClient
|
|
config Config
|
|
}
|
|
|
|
// NewDeployer creates a new Deployer.
|
|
func NewDeployer(client kubernetes.Interface, cfg Config) *Deployer {
|
|
if cfg.DefaultReplicas == 0 {
|
|
cfg.DefaultReplicas = 1
|
|
}
|
|
if cfg.IngressClass == "" {
|
|
cfg.IngressClass = "traefik"
|
|
}
|
|
if cfg.Namespace == "" {
|
|
cfg.Namespace = "projects"
|
|
}
|
|
return &Deployer{
|
|
client: client,
|
|
ingressClient: &k8sIngressClient{clientset: client},
|
|
config: cfg,
|
|
}
|
|
}
|
|
|
|
// NewDeployerWithIngressClient creates a Deployer with a custom IngressClient for testing.
|
|
func NewDeployerWithIngressClient(client kubernetes.Interface, ingressClient IngressClient, cfg Config) *Deployer {
|
|
if cfg.DefaultReplicas == 0 {
|
|
cfg.DefaultReplicas = 1
|
|
}
|
|
if cfg.IngressClass == "" {
|
|
cfg.IngressClass = "traefik"
|
|
}
|
|
if cfg.Namespace == "" {
|
|
cfg.Namespace = "projects"
|
|
}
|
|
return &Deployer{
|
|
client: client,
|
|
ingressClient: ingressClient,
|
|
config: cfg,
|
|
}
|
|
}
|
|
|
|
// Deploy creates or updates a deployment for a project.
|
|
func (d *Deployer) Deploy(ctx context.Context, spec domain.DeploySpec) error {
|
|
// Validate spec
|
|
if spec.ProjectName == "" {
|
|
return fmt.Errorf("project name is required")
|
|
}
|
|
if spec.Image == "" {
|
|
return fmt.Errorf("image is required")
|
|
}
|
|
|
|
// Set defaults
|
|
if spec.Port == 0 {
|
|
spec.Port = 8080
|
|
}
|
|
if spec.Replicas == 0 {
|
|
spec.Replicas = d.config.DefaultReplicas
|
|
}
|
|
if spec.Domain == "" {
|
|
spec.Domain = spec.ProjectName + "." + d.config.DefaultDomain
|
|
}
|
|
|
|
// Create namespace if it doesn't exist
|
|
if err := d.ensureNamespace(ctx); err != nil {
|
|
return fmt.Errorf("failed to ensure namespace: %w", err)
|
|
}
|
|
|
|
// Create or update Secret for env vars
|
|
if len(spec.Secrets) > 0 {
|
|
if err := d.createOrUpdateSecret(ctx, spec); err != nil {
|
|
return fmt.Errorf("failed to create secret: %w", err)
|
|
}
|
|
}
|
|
|
|
// Create or update Deployment
|
|
if err := d.createOrUpdateDeployment(ctx, spec); err != nil {
|
|
return fmt.Errorf("failed to create deployment: %w", err)
|
|
}
|
|
|
|
// Create or update Service
|
|
if err := d.createOrUpdateService(ctx, spec); err != nil {
|
|
return fmt.Errorf("failed to create service: %w", err)
|
|
}
|
|
|
|
// Create or update Ingress for single-app projects only.
|
|
// Monorepo components (with ComponentPath set) use unified project-level Ingress
|
|
// managed via AddIngressPath instead.
|
|
if spec.ComponentPath == "" {
|
|
if err := d.createOrUpdateIngress(ctx, spec); err != nil {
|
|
return fmt.Errorf("failed to create ingress: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Undeploy removes all deployment resources for a project.
|
|
func (d *Deployer) Undeploy(ctx context.Context, projectName string) error {
|
|
ns := d.config.Namespace
|
|
|
|
// Delete Ingress
|
|
err := d.client.NetworkingV1().Ingresses(ns).Delete(ctx, projectName, metav1.DeleteOptions{})
|
|
if err != nil && !k8serr.IsNotFound(err) {
|
|
return fmt.Errorf("failed to delete ingress: %w", err)
|
|
}
|
|
|
|
// Delete Service
|
|
err = d.client.CoreV1().Services(ns).Delete(ctx, projectName, metav1.DeleteOptions{})
|
|
if err != nil && !k8serr.IsNotFound(err) {
|
|
return fmt.Errorf("failed to delete service: %w", err)
|
|
}
|
|
|
|
// Delete Deployment
|
|
err = d.client.AppsV1().Deployments(ns).Delete(ctx, projectName, metav1.DeleteOptions{})
|
|
if err != nil && !k8serr.IsNotFound(err) {
|
|
return fmt.Errorf("failed to delete deployment: %w", err)
|
|
}
|
|
|
|
// Delete Secret
|
|
err = d.client.CoreV1().Secrets(ns).Delete(ctx, projectName+"-env", metav1.DeleteOptions{})
|
|
if err != nil && !k8serr.IsNotFound(err) {
|
|
return fmt.Errorf("failed to delete secret: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UndeployAll removes all deployment resources matching the project label.
|
|
// Unlike Undeploy which deletes by exact name, this uses label selectors to find
|
|
// and delete all resources (including monorepo components like {project}-{component}).
|
|
// Errors are accumulated so that a single resource failure doesn't prevent cleanup of others.
|
|
func (d *Deployer) UndeployAll(ctx context.Context, projectName string) error {
|
|
ns := d.config.Namespace
|
|
selector := fmt.Sprintf("project=%s", projectName)
|
|
propagation := metav1.DeletePropagationForeground
|
|
deleteOpts := metav1.DeleteOptions{PropagationPolicy: &propagation}
|
|
listOpts := metav1.ListOptions{LabelSelector: selector}
|
|
|
|
var errs []error
|
|
|
|
// Delete Ingresses
|
|
ingresses, err := d.client.NetworkingV1().Ingresses(ns).List(ctx, listOpts)
|
|
if err != nil {
|
|
errs = append(errs, fmt.Errorf("failed to list ingresses: %w", err))
|
|
} else {
|
|
for _, ing := range ingresses.Items {
|
|
if err := d.client.NetworkingV1().Ingresses(ns).Delete(ctx, ing.Name, deleteOpts); err != nil && !k8serr.IsNotFound(err) {
|
|
errs = append(errs, fmt.Errorf("failed to delete ingress %s: %w", ing.Name, err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delete Services
|
|
services, err := d.client.CoreV1().Services(ns).List(ctx, listOpts)
|
|
if err != nil {
|
|
errs = append(errs, fmt.Errorf("failed to list services: %w", err))
|
|
} else {
|
|
for _, svc := range services.Items {
|
|
if err := d.client.CoreV1().Services(ns).Delete(ctx, svc.Name, deleteOpts); err != nil && !k8serr.IsNotFound(err) {
|
|
errs = append(errs, fmt.Errorf("failed to delete service %s: %w", svc.Name, err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delete Deployments
|
|
deployments, err := d.client.AppsV1().Deployments(ns).List(ctx, listOpts)
|
|
if err != nil {
|
|
errs = append(errs, fmt.Errorf("failed to list deployments: %w", err))
|
|
} else {
|
|
for _, dep := range deployments.Items {
|
|
if err := d.client.AppsV1().Deployments(ns).Delete(ctx, dep.Name, deleteOpts); err != nil && !k8serr.IsNotFound(err) {
|
|
errs = append(errs, fmt.Errorf("failed to delete deployment %s: %w", dep.Name, err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delete Secrets
|
|
secrets, err := d.client.CoreV1().Secrets(ns).List(ctx, listOpts)
|
|
if err != nil {
|
|
errs = append(errs, fmt.Errorf("failed to list secrets: %w", err))
|
|
} else {
|
|
for _, sec := range secrets.Items {
|
|
if err := d.client.CoreV1().Secrets(ns).Delete(ctx, sec.Name, deleteOpts); err != nil && !k8serr.IsNotFound(err) {
|
|
errs = append(errs, fmt.Errorf("failed to delete secret %s: %w", sec.Name, err))
|
|
}
|
|
}
|
|
}
|
|
|
|
return errors.Join(errs...)
|
|
}
|
|
|
|
// ListProjectLabels returns unique project label values from all deployments in the namespace.
|
|
// This is used by the GC reconciliation worker to discover orphaned resources.
|
|
func (d *Deployer) ListProjectLabels(ctx context.Context) ([]string, error) {
|
|
ns := d.config.Namespace
|
|
|
|
deployments, err := d.client.AppsV1().Deployments(ns).List(ctx, metav1.ListOptions{
|
|
LabelSelector: "project",
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list deployments: %w", err)
|
|
}
|
|
|
|
seen := make(map[string]struct{})
|
|
var labels []string
|
|
for _, dep := range deployments.Items {
|
|
project := dep.Labels["project"]
|
|
if project == "" {
|
|
continue
|
|
}
|
|
if _, ok := seen[project]; !ok {
|
|
seen[project] = struct{}{}
|
|
labels = append(labels, project)
|
|
}
|
|
}
|
|
return labels, nil
|
|
}
|
|
|
|
// GetOldestResourceTime returns the creation time of the oldest deployment
|
|
// matching the given project label. Returns false if no resources exist.
|
|
func (d *Deployer) GetOldestResourceTime(ctx context.Context, projectName string) (time.Time, bool, error) {
|
|
ns := d.config.Namespace
|
|
|
|
deployments, err := d.client.AppsV1().Deployments(ns).List(ctx, metav1.ListOptions{
|
|
LabelSelector: fmt.Sprintf("project=%s", projectName),
|
|
})
|
|
if err != nil {
|
|
return time.Time{}, false, fmt.Errorf("failed to list deployments: %w", err)
|
|
}
|
|
if len(deployments.Items) == 0 {
|
|
return time.Time{}, false, nil
|
|
}
|
|
|
|
oldest := deployments.Items[0].CreationTimestamp.Time
|
|
for _, dep := range deployments.Items[1:] {
|
|
if dep.CreationTimestamp.Time.Before(oldest) {
|
|
oldest = dep.CreationTimestamp.Time
|
|
}
|
|
}
|
|
return oldest, true, nil
|
|
}
|
|
|
|
// GetStatus returns the current deployment status for a project.
|
|
func (d *Deployer) GetStatus(ctx context.Context, projectName string) (*domain.DeployStatus, error) {
|
|
ns := d.config.Namespace
|
|
|
|
deployment, err := d.client.AppsV1().Deployments(ns).Get(ctx, projectName, metav1.GetOptions{})
|
|
if err != nil {
|
|
if k8serr.IsNotFound(err) {
|
|
return nil, nil
|
|
}
|
|
return nil, fmt.Errorf("failed to get deployment: %w", err)
|
|
}
|
|
|
|
// Determine status
|
|
var status domain.DeploymentStatus
|
|
switch {
|
|
case deployment.Status.ReadyReplicas == *deployment.Spec.Replicas:
|
|
status = domain.DeploymentStatusRunning
|
|
case deployment.Status.UnavailableReplicas > 0:
|
|
status = domain.DeploymentStatusFailed
|
|
case deployment.Status.ReadyReplicas < *deployment.Spec.Replicas:
|
|
status = domain.DeploymentStatusPending
|
|
default:
|
|
status = domain.DeploymentStatusUnknown
|
|
}
|
|
|
|
// Get URL from ingress
|
|
var url string
|
|
ingress, err := d.client.NetworkingV1().Ingresses(ns).Get(ctx, projectName, metav1.GetOptions{})
|
|
if err == nil && len(ingress.Spec.Rules) > 0 {
|
|
host := ingress.Spec.Rules[0].Host
|
|
url = "https://" + host
|
|
}
|
|
|
|
return &domain.DeployStatus{
|
|
ProjectName: projectName,
|
|
Image: deployment.Spec.Template.Spec.Containers[0].Image,
|
|
Replicas: int(*deployment.Spec.Replicas),
|
|
ReadyReplicas: int(deployment.Status.ReadyReplicas),
|
|
URL: url,
|
|
Status: status,
|
|
CreatedAt: deployment.CreationTimestamp.Time,
|
|
UpdatedAt: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// Restart triggers a rolling restart of the deployment.
|
|
func (d *Deployer) Restart(ctx context.Context, projectName string) error {
|
|
ns := d.config.Namespace
|
|
|
|
deployment, err := d.client.AppsV1().Deployments(ns).Get(ctx, projectName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get deployment: %w", err)
|
|
}
|
|
|
|
// Add annotation to trigger rollout
|
|
if deployment.Spec.Template.Annotations == nil {
|
|
deployment.Spec.Template.Annotations = make(map[string]string)
|
|
}
|
|
deployment.Spec.Template.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)
|
|
|
|
_, err = d.client.AppsV1().Deployments(ns).Update(ctx, deployment, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update deployment: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Scale adjusts the replica count for a deployment.
|
|
func (d *Deployer) Scale(ctx context.Context, projectName string, replicas int) error {
|
|
ns := d.config.Namespace
|
|
|
|
scale, err := d.client.AppsV1().Deployments(ns).GetScale(ctx, projectName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get scale: %w", err)
|
|
}
|
|
|
|
scale.Spec.Replicas = int32(replicas)
|
|
|
|
_, err = d.client.AppsV1().Deployments(ns).UpdateScale(ctx, projectName, scale, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update scale: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetLogs returns recent logs from the deployment pods.
|
|
func (d *Deployer) GetLogs(ctx context.Context, projectName string, tailLines int) (string, error) {
|
|
ns := d.config.Namespace
|
|
|
|
// List pods for the deployment
|
|
pods, err := d.client.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{
|
|
LabelSelector: fmt.Sprintf("app=%s", projectName),
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to list pods: %w", err)
|
|
}
|
|
|
|
if len(pods.Items) == 0 {
|
|
return "", fmt.Errorf("no pods found for project %s", projectName)
|
|
}
|
|
|
|
// Get logs from the first pod
|
|
tail := int64(tailLines)
|
|
opts := &corev1.PodLogOptions{
|
|
TailLines: &tail,
|
|
}
|
|
|
|
req := d.client.CoreV1().Pods(ns).GetLogs(pods.Items[0].Name, opts)
|
|
logs, err := req.Stream(ctx)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to get logs: %w", err)
|
|
}
|
|
defer func() { _ = logs.Close() }()
|
|
|
|
buf := new(bytes.Buffer)
|
|
_, err = buf.ReadFrom(logs)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to read logs: %w", err)
|
|
}
|
|
|
|
return buf.String(), nil
|
|
}
|