rdev/cmd/rdev-api/main.go

503 lines
18 KiB
Go

// Package main provides the entry point for the rdev API server.
package main
import (
"context"
"log/slog"
"os"
"time"
"github.com/orchard9/rdev/internal/adapter/cloudflare"
"github.com/orchard9/rdev/internal/adapter/cockroach"
"github.com/orchard9/rdev/internal/adapter/codeagent"
"github.com/orchard9/rdev/internal/adapter/codeagent/claudecode"
"github.com/orchard9/rdev/internal/adapter/codeagent/opencode"
"github.com/orchard9/rdev/internal/adapter/deployer"
"github.com/orchard9/rdev/internal/adapter/gitea"
"github.com/orchard9/rdev/internal/adapter/kubernetes"
"github.com/orchard9/rdev/internal/adapter/memory"
"github.com/orchard9/rdev/internal/adapter/postgres"
redisadapter "github.com/orchard9/rdev/internal/adapter/redis"
"github.com/orchard9/rdev/internal/adapter/templates"
"github.com/orchard9/rdev/internal/adapter/woodpecker"
"github.com/orchard9/rdev/internal/auth"
"github.com/orchard9/rdev/internal/db"
"github.com/orchard9/rdev/internal/envutil"
"github.com/orchard9/rdev/internal/handlers"
"github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/middleware"
"github.com/orchard9/rdev/internal/port"
"github.com/orchard9/rdev/internal/service"
"github.com/orchard9/rdev/internal/telemetry"
"github.com/orchard9/rdev/internal/webhook"
"github.com/orchard9/rdev/internal/worker"
"github.com/orchard9/rdev/pkg/api"
)
// version is set via ldflags at build time: -ldflags="-X main.version=v0.8.0"
var version = "dev"
func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
// Initialize telemetry (OpenTelemetry)
telCfg := telemetry.DefaultConfig()
telCfg.Logger = logger
tel, err := telemetry.New(context.Background(), telCfg)
if err != nil {
logger.Error("failed to initialize telemetry", "error", err)
os.Exit(1)
}
// Load configuration from environment
cfg := loadConfig()
// Validate required security configuration
if cfg.CredentialEncryptionKey == "" {
logger.Warn("CREDENTIAL_ENCRYPTION_KEY not set - credential store will use insecure default",
"hint", "Generate with: openssl rand -base64 32")
// Use a deterministic fallback for development only
cfg.CredentialEncryptionKey = "rdev-dev-key-not-for-production"
}
// Initialize database with auto-migrations
database, err := db.New(db.Config{
Host: cfg.DBHost,
Port: cfg.DBPort,
User: cfg.DBUser,
Password: cfg.DBPassword,
Database: cfg.DBName,
SSLMode: cfg.DBSSLMode,
}, logger)
if err != nil {
logger.Error("failed to connect to database", "error", err)
os.Exit(1)
}
defer func() { _ = database.Close() }()
// Initialize auth service (hexagonal: repo → service → auth wrapper)
apiKeyRepo := postgres.NewAPIKeyRepository(database.DB)
apiKeySvc := service.NewAPIKeyService(apiKeyRepo, cfg.AdminKey)
authService := auth.NewService(apiKeySvc, cfg.AdminKey)
// Initialize credential store (for infrastructure secrets)
credentialStore := postgres.NewCredentialStore(database.DB, cfg.CredentialEncryptionKey)
// Load infrastructure config from credential store (falls back to env vars)
infraCfg := loadInfraConfig(context.Background(), credentialStore, cfg, logger)
// Create adapters (dependency injection)
namespace := envutil.GetEnv("K8S_NAMESPACE", "rdev")
// Initialize K8s client for dynamic project discovery
// Falls back gracefully if K8s is unavailable (e.g., local development)
k8sClient := kubernetes.NewClientOrNil(kubernetes.ClientConfig{
Namespace: namespace,
Kubeconfig: os.Getenv("KUBECONFIG"),
})
if k8sClient != nil {
logger.Info("k8s client initialized, dynamic project discovery enabled")
} else {
logger.Warn("k8s client unavailable, using hardcoded fallback projects")
}
projectRepo := kubernetes.NewProjectRepositoryWithClient(namespace, k8sClient, logger)
k8sExecutor := kubernetes.NewExecutor(namespace)
streamPub := memory.NewStreamPublisher()
if k8sClient != nil {
if err := projectRepo.StartWatching(context.Background()); err != nil {
logger.Warn("failed to start project watcher", "error", err)
}
}
auditLogger := postgres.NewAuditLogger(database.DB)
rateLimiter := postgres.NewRateLimiter(database.DB)
stopRateLimitCleanup := rateLimiter.StartCleanupWorker(context.Background(), 5*time.Minute)
commandQueue := postgres.NewCommandQueueRepository(database.DB)
workQueueRepo := postgres.NewWorkQueueRepository(database.DB)
webhookRepo := postgres.NewWebhookRepository(database.DB)
webhookDispatcher := webhook.NewDispatcher(webhookRepo, &webhook.DispatcherConfig{
WorkerCount: 10,
MaxRetries: 3,
Timeout: 30 * time.Second,
RetryBackoff: 5 * time.Second,
Logger: logger,
})
if err := webhookDispatcher.Start(); err != nil {
logger.Error("failed to start webhook dispatcher", "error", err)
os.Exit(1)
}
// Infrastructure adapters (optional - only if configured)
var giteaClient *gitea.Client
if infraCfg.GiteaToken != "" && infraCfg.GiteaURL != "" {
var err error
giteaClient, err = gitea.NewClient(infraCfg.GiteaURL, infraCfg.GiteaToken, infraCfg.GiteaDefaultOrg)
if err != nil {
logger.Warn("failed to initialize gitea client", "error", err)
} else {
logger.Info("gitea client initialized", "url", infraCfg.GiteaURL, "org", infraCfg.GiteaDefaultOrg)
}
}
var dnsClient *cloudflare.Client
if infraCfg.CloudflareToken != "" && infraCfg.CloudflareZoneID != "" {
dnsClient = cloudflare.NewClient(infraCfg.CloudflareToken, infraCfg.CloudflareZoneID, infraCfg.DefaultDomain)
logger.Info("cloudflare DNS client initialized", "domain", infraCfg.DefaultDomain)
}
var deployerAdapter *deployer.Deployer
if k8sClient != nil {
deployerAdapter = deployer.NewDeployer(k8sClient, deployer.Config{
Namespace: infraCfg.DeployNamespace,
IngressClass: "traefik",
TLSIssuer: infraCfg.DeployTLSIssuer,
DefaultDomain: infraCfg.DefaultDomain,
DefaultReplicas: 1,
})
logger.Info("deployer initialized", "namespace", infraCfg.DeployNamespace)
}
var woodpeckerClient *woodpecker.Client
if infraCfg.WoodpeckerURL != "" && infraCfg.WoodpeckerAPIToken != "" {
var err error
woodpeckerClient, err = woodpecker.NewClient(
infraCfg.WoodpeckerURL,
infraCfg.WoodpeckerAPIToken,
woodpecker.WithLogger(logger),
)
if err != nil {
logger.Warn("failed to initialize woodpecker client", "error", err)
} else {
logger.Info("woodpecker CI client initialized", "url", infraCfg.WoodpeckerURL)
}
}
// Initialize template provider (requires Gitea credentials for seeding repos)
var templateProvider *templates.Provider
if infraCfg.GiteaToken != "" && infraCfg.GiteaURL != "" {
// Pass URL and token directly - provider uses bulk file API for single-commit seeding
templateProvider = templates.NewProvider(infraCfg.GiteaURL, infraCfg.GiteaToken, logger)
logger.Info("template provider initialized")
}
// Initialize database provisioner (optional - for project database isolation)
var dbProvisioner port.DatabaseProvisioner
if infraCfg.CRDBHost != "" {
var err error
dbProvisioner, err = cockroach.NewProvisioner(cockroach.Config{
Host: infraCfg.CRDBHost,
Port: infraCfg.CRDBPort,
User: infraCfg.CRDBUser,
SSLMode: infraCfg.CRDBSSLMode,
}, logger)
if err != nil {
logger.Warn("failed to initialize cockroachdb provisioner", "error", err)
} else {
logger.Info("cockroachdb provisioner initialized", "host", infraCfg.CRDBHost)
}
}
// Initialize cache provisioner (optional - for project cache isolation via Redis ACLs)
var cacheProvisioner port.CacheProvisioner
if infraCfg.RedisHost != "" && infraCfg.RedisPassword != "" {
var err error
cacheProvisioner, err = redisadapter.NewProvisioner(redisadapter.Config{
Host: infraCfg.RedisHost,
Port: infraCfg.RedisPort,
Password: infraCfg.RedisPassword,
}, logger)
if err != nil {
logger.Warn("failed to initialize redis provisioner", "error", err)
} else {
logger.Info("redis provisioner initialized", "host", infraCfg.RedisHost)
}
}
// Initialize CodeAgent registry (multi-provider support)
agentRegistry := codeagent.NewRegistry()
// Register Claude Code adapter (default - always available)
claudeCodeAdapter := claudecode.NewAdapter(namespace)
agentRegistry.Register(claudeCodeAdapter)
logger.Info("registered Claude Code agent", "provider", claudeCodeAdapter.Provider())
// Register OpenCode adapter (optional - only if configured)
if cfg.OpenCodeURL != "" {
openCodeAdapter := opencode.NewAdapter(opencode.ClientConfig{
BaseURL: cfg.OpenCodeURL,
Username: cfg.OpenCodeUsername,
Password: cfg.OpenCodePassword,
Timeout: 30 * time.Second,
})
agentRegistry.Register(openCodeAdapter)
logger.Info("registered OpenCode agent", "provider", openCodeAdapter.Provider(), "url", cfg.OpenCodeURL)
}
// Create services
projectService := service.NewProjectService(projectRepo, k8sExecutor, streamPub).
WithAuditLogger(auditLogger).
WithCommandQueue(commandQueue).
WithWebhookDispatcher(webhookDispatcher).
WithCodeAgentRegistry(agentRegistry)
// Create work service (for worker pool task management)
workService := service.NewWorkService(workQueueRepo, service.WorkServiceConfig{
Logger: logger,
}).WithWebhookDispatcher(webhookDispatcher)
// Initialize worker pool infrastructure
workerRegistryRepo := postgres.NewWorkerRegistryRepository(database.DB)
buildAuditRepo := postgres.NewBuildAuditRepository(database.DB)
// Create worker service (manages worker lifecycle and task assignment)
workerService := service.NewWorkerService(workerRegistryRepo, workQueueRepo, logger).
WithBuildAudit(buildAuditRepo)
// Create build service (orchestrates build submission and tracking)
buildService := service.NewBuildService(workQueueRepo, buildAuditRepo, logger)
// Create app
app := api.New("rdev-api",
api.WithPort(cfg.Port),
api.WithLogger(logger),
)
// Add telemetry middleware (first to capture all requests)
app.Use(telemetry.Middleware(telCfg.ServiceName))
// Add metrics middleware (before auth to track all requests)
app.Use(metrics.Middleware)
// Add auth middleware (skips /health, /ready, /docs, /openapi.json, /metrics)
app.Use(auth.Middleware(authService))
// Add rate limiting middleware (after auth, so we have API key context)
rateLimitCfg := middleware.DefaultRateLimitConfig()
rateLimitCfg.Limiter = rateLimiter
app.Use(middleware.RateLimitMiddleware(rateLimitCfg))
// Register metrics endpoint (no auth required)
app.Router().Handle("/metrics", metrics.Handler())
// Initialize handlers
projectsHandler := handlers.NewProjectsHandlerWithService(projectService)
keysHandler := handlers.NewKeysHandler(authService)
claudeConfigHandler := handlers.NewClaudeConfigHandlerWithService(projectService, projectRepo, k8sExecutor)
auditHandler := handlers.NewAuditHandler(auditLogger)
queueHandler := handlers.NewQueueHandler(commandQueue, projectRepo)
webhookHandler := handlers.NewWebhookHandler(webhookRepo, projectRepo)
workHandler := handlers.NewWorkHandler(workService)
// Initialize domain and slug repositories
projectDomainRepo := postgres.NewProjectDomainRepository(database.DB)
slugGenerator := postgres.NewSlugRepository(database.DB)
// Initialize project infrastructure service (orchestrates full project lifecycle)
projectInfraService := service.NewProjectInfraService(
database.DB,
giteaClient,
dnsClient,
deployerAdapter,
woodpeckerClient, // CI provider for auto-activating repos
templateProvider, // Template provider for seeding repos
projectDomainRepo,
slugGenerator,
service.ProjectInfraConfig{
DefaultGitOwner: infraCfg.GiteaDefaultOrg,
DefaultDomain: infraCfg.DefaultDomain,
ClusterIP: infraCfg.ClusterIP,
Logger: logger,
},
)
// Wire optional database and cache provisioners
if dbProvisioner != nil {
projectInfraService = projectInfraService.WithDatabaseProvisioner(dbProvisioner)
}
if cacheProvisioner != nil {
projectInfraService = projectInfraService.WithCacheProvisioner(cacheProvisioner)
}
// Create domain service adapter for infrastructure handler
domainServiceAdapter := handlers.NewDomainServiceAdapter(projectInfraService)
// Initialize infrastructure handler (for threesix.ai git/deploy/dns/ci)
infraHandler := handlers.NewInfrastructureHandler(
giteaClient,
dnsClient,
deployerAdapter,
projectRepo,
woodpeckerClient,
domainServiceAdapter,
handlers.InfrastructureConfig{
DefaultGitOwner: infraCfg.GiteaDefaultOrg,
DefaultDomain: infraCfg.DefaultDomain,
ClusterIP: infraCfg.ClusterIP,
},
)
// Initialize project management handler
projectMgmtHandler := handlers.NewProjectManagementHandler(projectInfraService, logger)
// Initialize component service and handler (for monorepo component management)
var componentsHandler *handlers.ComponentsHandler
if infraCfg.GiteaToken != "" && infraCfg.GiteaURL != "" && templateProvider != nil {
bulkFileClient := gitea.NewBulkFileClient(infraCfg.GiteaURL, infraCfg.GiteaToken)
componentService := service.NewComponentService(
database.DB,
templateProvider,
bulkFileClient,
deployerAdapter, // Creates initial K8s deployment for new components
service.ComponentServiceConfig{
DefaultGitOwner: infraCfg.GiteaDefaultOrg,
RegistryURL: infraCfg.RegistryURL,
Logger: logger,
},
)
componentsHandler = handlers.NewComponentsHandler(componentService, logger)
logger.Info("component service initialized")
}
// Initialize Woodpecker webhook handler (for CI/CD auto-deploy)
woodpeckerHandler := handlers.NewWoodpeckerWebhookHandler(
deployerAdapter,
dnsClient,
handlers.WoodpeckerWebhookConfig{
WebhookSecret: infraCfg.WoodpeckerWebhookSecret,
DefaultDomain: infraCfg.DefaultDomain,
RegistryURL: infraCfg.RegistryURL,
ClusterIP: infraCfg.ClusterIP,
Logger: logger,
},
)
// Initialize credentials handler (superadmin only)
credentialsHandler := handlers.NewCredentialsHandler(credentialStore)
// Initialize agents handler (for code agent management)
agentsHandler := handlers.NewAgentsHandler(agentRegistry)
// Initialize worker pool handlers
workersHandler := handlers.NewWorkersHandler(workerService)
buildsHandler := handlers.NewBuildsHandler(buildService)
createAndBuildHandler := handlers.NewCreateAndBuildHandler(projectInfraService, buildService, logger)
// Override default health/ready endpoints with full dependency checks
healthHandler := handlers.NewHealthHandler("rdev-api", database.DB, nil).
WithAgentRegistry(agentRegistry)
app.Router().Get("/health", healthHandler.Health)
app.Router().Get("/ready", healthHandler.Ready)
// Register routes
projectsHandler.Mount(app.Router())
keysHandler.Mount(app.Router())
claudeConfigHandler.Mount(app.Router())
auditHandler.Mount(app.Router())
queueHandler.Mount(app.Router())
webhookHandler.Mount(app.Router())
workHandler.Mount(app.Router())
infraHandler.Mount(app.Router())
projectMgmtHandler.Mount(app.Router())
if componentsHandler != nil {
componentsHandler.Mount(app.Router())
}
woodpeckerHandler.Mount(app.Router())
credentialsHandler.Mount(app.Router())
agentsHandler.Mount(app.Router())
workersHandler.Mount(app.Router())
buildsHandler.Mount(app.Router())
createAndBuildHandler.Mount(app.Router())
// Start queue processor worker (per-project command queue)
queueProcessor := worker.NewQueueProcessor(
commandQueue,
k8sExecutor,
projectRepo,
streamPub,
&worker.QueueProcessorConfig{
PollPeriod: 5 * time.Second,
Logger: logger,
},
).WithWebhookDispatcher(webhookDispatcher)
if err := queueProcessor.Start(); err != nil {
logger.Error("failed to start queue processor", "error", err)
os.Exit(1)
}
// Start work executor (cross-project worker pool)
// PodGitOperations runs git commands inside the pod via kubectl exec.
// This ensures deterministic post-build commit/push instead of relying on LLMs.
var podGitOps *worker.PodGitOperations
if infraCfg.GiteaToken != "" {
podGitOps = worker.NewPodGitOperations(worker.PodGitOperationsConfig{
Namespace: "rdev",
GiteaToken: infraCfg.GiteaToken,
Logger: logger,
})
}
buildExecutor := worker.NewBuildExecutor(agentRegistry, podGitOps, streamPub, logger, nil)
workerCfg := worker.DefaultWorkExecutorConfig()
workerCfg.Logger = logger
workExecutor := worker.NewWorkExecutor(
workerService,
workService,
buildExecutor,
workerCfg,
)
if err := workExecutor.Start(); err != nil {
logger.Error("failed to start work executor", "error", err)
os.Exit(1)
}
healthHandler.WithWorkExecutor(workExecutor)
// Start queue maintenance worker (stale task recovery, worker health, cleanup, metrics)
queueMaintenance := worker.NewQueueMaintenance(
workQueueRepo,
workerRegistryRepo,
&worker.QueueMaintenanceConfig{
StaleTaskTimeout: 30 * time.Minute,
StaleWorkerTimeout: 2 * time.Minute,
CleanupAge: 7 * 24 * time.Hour,
MaintenancePeriod: 1 * time.Minute,
MetricsPeriod: 15 * time.Second,
BuildAudit: buildAuditRepo, // Sync build audit when requeuing stale tasks
Logger: logger,
},
)
queueMaintenance.Start()
// Enable API documentation
app.EnableDocs(buildOpenAPISpec())
app.OnShutdown(func(ctx context.Context) error {
workExecutor.Stop()
queueMaintenance.Stop()
queueProcessor.Stop()
webhookDispatcher.Stop()
projectRepo.StopWatching()
stopRateLimitCleanup()
closeProvisioner(dbProvisioner, "database", logger)
closeProvisioner(cacheProvisioner, "cache", logger)
if err := tel.Shutdown(ctx); err != nil {
logger.Error("telemetry shutdown error", "error", err)
}
return database.Close()
})
logger.Info("rdev-api starting",
"version", version,
"port", cfg.Port,
"db_host", cfg.DBHost,
"admin_key_set", cfg.AdminKey != "",
)
app.Run()
}
// Config, InfraConfig, loadConfig, loadInfraConfig are defined in config.go.