diff --git a/cmd/rdev-api/main.go b/cmd/rdev-api/main.go index 94ad56f..fead6ba 100644 --- a/cmd/rdev-api/main.go +++ b/cmd/rdev-api/main.go @@ -276,7 +276,6 @@ func main() { podGitOps = worker.NewPodGitOperations(worker.PodGitOperationsConfig{ Namespace: "rdev", GiteaToken: infraCfg.GiteaToken, - Logger: logger, }) } @@ -485,7 +484,6 @@ func main() { giteaClient, worker.ExternalHealthConfig{ CheckInterval: 30 * time.Second, - Logger: logger, }, ) externalHealthChecker.Start() @@ -540,7 +538,6 @@ func main() { streamPub, &worker.QueueProcessorConfig{ PollPeriod: 5 * time.Second, - Logger: logger, }, ).WithWebhookDispatcher(webhookDispatcher) if err := queueProcessor.Start(); err != nil { @@ -549,9 +546,9 @@ func main() { } // Start work executor (cross-project worker pool, git via kubectl exec) - buildExecutor := worker.NewBuildExecutor(agentRegistry, podGitOps, streamPub, logger, nil) + buildExecutor := worker.NewBuildExecutor(agentRegistry, podGitOps, streamPub, nil) // VerifyExecutor for visual captures via Playwright pod - verifyExecutor := worker.NewVerifyExecutor(k8sExecutor, streamPub, logger, &worker.VerifyExecutorConfig{ + verifyExecutor := worker.NewVerifyExecutor(k8sExecutor, streamPub, &worker.VerifyExecutorConfig{ Namespace: namespace, PodName: "playwright-0", }) @@ -561,11 +558,9 @@ func main() { sdlcTaskExecutor = worker.NewSDLCTaskExecutor(worker.SDLCTaskExecutorConfig{ Namespace: namespace, PodGitOps: podGitOps, - Logger: logger, }) } workerCfg := worker.DefaultWorkExecutorConfig() - workerCfg.Logger = logger workExecutor := worker.NewWorkExecutor( workerService, workService, @@ -591,7 +586,6 @@ func main() { MaintenancePeriod: 1 * time.Minute, MetricsPeriod: 15 * time.Second, BuildAudit: buildAuditRepo, // Sync build audit when requeuing stale tasks - Logger: logger, }, ) queueMaintenance.Start() @@ -600,7 +594,6 @@ func main() { operationCleanup := worker.NewOperationCleanup(operationRepo, &worker.OperationCleanupConfig{ RetentionPeriod: 30 * 24 * time.Hour, CleanupInterval: 1 * time.Hour, - Logger: logger, }) operationCleanup.Start() diff --git a/internal/adapter/templates/templates/components/worker/cmd/worker/main.go.tmpl b/internal/adapter/templates/templates/components/worker/cmd/worker/main.go.tmpl index 40129bf..c7d5da9 100644 --- a/internal/adapter/templates/templates/components/worker/cmd/worker/main.go.tmpl +++ b/internal/adapter/templates/templates/components/worker/cmd/worker/main.go.tmpl @@ -21,14 +21,21 @@ import ( var migrationsFS embed.FS func main() { + // Initialize logger first (with defaults) so we can log config errors + logger := logging.New(logging.Config{ + Level: logging.LevelInfo, + Format: logging.FormatJSON, + }).WithService("{{COMPONENT_NAME}}") + // Initialize configuration cfg, err := workerconfig.Load() if err != nil { - panic("failed to load config: " + err.Error()) + logger.Error("failed to load config", "error", err) + os.Exit(1) } - // Initialize logger - logger := logging.New(logging.Config{ + // Reconfigure logger with loaded config + logger = logging.New(logging.Config{ Level: logging.ParseLevel(cfg.Logging.Level), Format: logging.ParseFormat(cfg.Logging.Format), Environment: cfg.AppConfig.Environment, @@ -87,18 +94,22 @@ func main() { sig := <-sigCh logger.Info("received shutdown signal", "signal", sig.String()) - // Trigger graceful shutdown + // Trigger graceful shutdown with grace period + logger.Info("initiating graceful shutdown") cancel() - // Give in-flight jobs time to complete - time.Sleep(2 * time.Second) + // Give in-flight jobs time to complete (grace period) + // This allows handlers to notice context cancellation and finish cleanly. + const shutdownGracePeriod = 5 * time.Second + time.Sleep(shutdownGracePeriod) logger.Info("{{COMPONENT_NAME}} worker stopped") } // runStaleJobRecovery periodically requeues jobs that have been running too long. func runStaleJobRecovery(ctx context.Context, q *queue.PostgresQueue, timeout time.Duration, logger *logging.Logger) { - ticker := time.NewTicker(time.Minute) + const staleCheckInterval = time.Minute + ticker := time.NewTicker(staleCheckInterval) defer ticker.Stop() for { diff --git a/internal/adapter/templates/templates/components/worker/internal/handlers/handler.go.tmpl b/internal/adapter/templates/templates/components/worker/internal/handlers/handler.go.tmpl index e7f0ae2..45c8872 100644 --- a/internal/adapter/templates/templates/components/worker/internal/handlers/handler.go.tmpl +++ b/internal/adapter/templates/templates/components/worker/internal/handlers/handler.go.tmpl @@ -108,25 +108,26 @@ func (h *Handler) processNextJob(ctx context.Context) error { h.mu.RUnlock() if !ok { - errMsg := fmt.Sprintf("unknown job type: %s", job.Type) h.logger.Error("no handler for job type", "job_id", job.ID, "type", job.Type) - return h.queue.Fail(ctx, job.ID, errMsg) + return h.queue.Fail(ctx, job.ID, fmt.Sprintf("unknown job type: %s", job.Type)) } - // Create job context with timeout - jobCtx, cancel := context.WithTimeout(ctx, h.config.JobTimeout) - defer cancel() - - // Apply middleware and process + // Apply middleware and process (TimeoutMiddleware handles the deadline) wrappedHandler := queue.Chain( queue.RecoveryMiddleware(h.logger), queue.LoggingMiddleware(h.logger), queue.TimeoutMiddleware(h.config.JobTimeout), )(handler) + // Use parent context - TimeoutMiddleware applies the job timeout + jobCtx := ctx + _ = jobCtx // jobCtx used below + if err := wrappedHandler(jobCtx, job); err != nil { - h.logger.Debug("job handler failed", "job_id", job.ID, "error", err) - return h.queue.Fail(ctx, job.ID, err.Error()) + // Truncate error message to prevent log bloat and potential data leakage + errMsg := truncateErrorMessage(err.Error(), 1000) + h.logger.Debug("job handler failed", "job_id", job.ID, "error", errMsg) + return h.queue.Fail(ctx, job.ID, errMsg) } return h.queue.Ack(ctx, job.ID) @@ -136,3 +137,11 @@ func (h *Handler) processNextJob(ctx context.Context) error { func (h *Handler) WorkerID() string { return h.workerID } + +// truncateErrorMessage limits error message length to prevent log bloat. +func truncateErrorMessage(msg string, maxLen int) string { + if len(msg) <= maxLen { + return msg + } + return msg[:maxLen-3] + "..." +} diff --git a/internal/adapter/templates/templates/skeleton/pkg/auth/context.go.tmpl b/internal/adapter/templates/templates/skeleton/pkg/auth/context.go.tmpl index 50c93a7..b1efd0a 100644 --- a/internal/adapter/templates/templates/skeleton/pkg/auth/context.go.tmpl +++ b/internal/adapter/templates/templates/skeleton/pkg/auth/context.go.tmpl @@ -2,6 +2,7 @@ package auth import ( "context" + "errors" ) // contextKey is a private type for context keys to prevent collisions. @@ -25,15 +26,26 @@ func GetUser(ctx context.Context) *User { } // MustGetUser retrieves the user from the context. -// Panics if no user is present. +// Panics if no user is present - use only in handlers protected by RequireAuth middleware. +// For non-middleware contexts, prefer GetUserOrError which returns an error. func MustGetUser(ctx context.Context) *User { user := GetUser(ctx) if user == nil { - panic("auth: user not found in context") + panic("auth: user not found in context - ensure RequireAuth middleware is applied") } return user } +// GetUserOrError retrieves the user from the context, returning an error if not present. +// Prefer this over MustGetUser when panic recovery is not guaranteed. +func GetUserOrError(ctx context.Context) (*User, error) { + user := GetUser(ctx) + if user == nil { + return nil, errors.New("auth: user not found in context") + } + return user, nil +} + // IsAuthenticated returns true if a user is present in the context. func IsAuthenticated(ctx context.Context) bool { return GetUser(ctx) != nil diff --git a/internal/adapter/templates/templates/skeleton/pkg/auth/jwt.go.tmpl b/internal/adapter/templates/templates/skeleton/pkg/auth/jwt.go.tmpl index 273823a..5c34757 100644 --- a/internal/adapter/templates/templates/skeleton/pkg/auth/jwt.go.tmpl +++ b/internal/adapter/templates/templates/skeleton/pkg/auth/jwt.go.tmpl @@ -69,16 +69,16 @@ func (v *JWTValidator) Validate(ctx context.Context, tokenString string) (*User, switch token.Method.(type) { case *jwt.SigningMethodHMAC: if v.secret == nil { - return nil, fmt.Errorf("HMAC secret not configured") + return nil, fmt.Errorf("%w: HMAC secret not configured", ErrInvalidToken) } return v.secret, nil case *jwt.SigningMethodRSA, *jwt.SigningMethodECDSA: if v.publicKey == nil { - return nil, fmt.Errorf("public key not configured") + return nil, fmt.Errorf("%w: public key not configured", ErrInvalidToken) } return v.publicKey, nil default: - return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"]) + return nil, fmt.Errorf("%w: unexpected signing method %v", ErrInvalidToken, token.Header["alg"]) } }) @@ -86,7 +86,7 @@ func (v *JWTValidator) Validate(ctx context.Context, tokenString string) (*User, if errors.Is(err, jwt.ErrTokenExpired) { return nil, ErrExpiredToken } - return nil, fmt.Errorf("%w: %v", ErrInvalidToken, err) + return nil, fmt.Errorf("%w: %w", ErrInvalidToken, err) } claims, ok := token.Claims.(*JWTClaims) diff --git a/internal/adapter/templates/templates/skeleton/pkg/queue/postgres.go.tmpl b/internal/adapter/templates/templates/skeleton/pkg/queue/postgres.go.tmpl index 2bc3e5e..7721f02 100644 --- a/internal/adapter/templates/templates/skeleton/pkg/queue/postgres.go.tmpl +++ b/internal/adapter/templates/templates/skeleton/pkg/queue/postgres.go.tmpl @@ -44,13 +44,25 @@ func (q *PostgresQueue) Enqueue(ctx context.Context, jobType string, payload map // EnqueueWithOptions adds a job with custom configuration. func (q *PostgresQueue) EnqueueWithOptions(ctx context.Context, job Job) (string, error) { + // Validate required fields + if job.Type == "" { + return "", fmt.Errorf("job type is required: %w", ErrJobNotFound) + } + job.ID = uuid.New().String() job.Status = StatusPending job.CreatedAt = time.Now().UTC() + // Apply defaults and constraints if job.MaxRetries == 0 { job.MaxRetries = 3 } + if job.MaxRetries > 100 { + job.MaxRetries = 100 // Cap at reasonable limit + } + if job.Payload == nil { + job.Payload = make(map[string]any) + } payloadJSON, err := json.Marshal(job.Payload) if err != nil { @@ -117,7 +129,10 @@ func (q *PostgresQueue) Ack(ctx context.Context, jobID string) error { return fmt.Errorf("ack job: %w", err) } - rows, _ := result.RowsAffected() + rows, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("check rows affected: %w", err) + } if rows == 0 { return ErrJobNotFound } @@ -160,12 +175,20 @@ func (q *PostgresQueue) Fail(ctx context.Context, jobID string, errMsg string) e return fmt.Errorf("fail job: %w", err) } - rows, _ := result.RowsAffected() + rows, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("check rows affected: %w", err) + } if rows == 0 { return ErrJobNotFound } - q.logger.Debug("job failed", "job_id", jobID, "error", errMsg) + // Truncate error message to prevent log bloat (limit to 500 chars for logging) + logErrMsg := errMsg + if len(logErrMsg) > 500 { + logErrMsg = logErrMsg[:497] + "..." + } + q.logger.Debug("job failed", "job_id", jobID, "error", logErrMsg) return nil } @@ -179,7 +202,10 @@ func (q *PostgresQueue) Heartbeat(ctx context.Context, jobID string) error { return fmt.Errorf("heartbeat job: %w", err) } - rows, _ := result.RowsAffected() + rows, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("check rows affected: %w", err) + } if rows == 0 { return ErrJobNotFound } @@ -201,7 +227,10 @@ func (q *PostgresQueue) RequeueStale(ctx context.Context, timeout time.Duration) return 0, fmt.Errorf("requeue stale jobs: %w", err) } - count, _ := result.RowsAffected() + count, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("check rows affected: %w", err) + } if count > 0 { q.logger.Info("requeued stale jobs", "count", count, "timeout", timeout) } diff --git a/internal/adapter/templates/templates/skeleton/pkg/queue/queue.go.tmpl b/internal/adapter/templates/templates/skeleton/pkg/queue/queue.go.tmpl index f2dd789..7b1f9e7 100644 --- a/internal/adapter/templates/templates/skeleton/pkg/queue/queue.go.tmpl +++ b/internal/adapter/templates/templates/skeleton/pkg/queue/queue.go.tmpl @@ -2,9 +2,9 @@ // // This package implements a reliable producer/consumer pattern using: // - Atomic dequeue with FOR UPDATE SKIP LOCKED -// - Automatic retry with exponential backoff +// - Automatic retry (immediate requeue up to max_retries) // - Job priority and ordering -// - Stale job recovery +// - Stale job recovery via RequeueStale // // Usage: // diff --git a/internal/service/project_service.go b/internal/service/project_service.go index b8b6b62..f11616d 100644 --- a/internal/service/project_service.go +++ b/internal/service/project_service.go @@ -6,12 +6,12 @@ import ( "context" "encoding/json" "fmt" - "log/slog" "sync/atomic" "time" "github.com/google/uuid" "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/sanitize" @@ -26,7 +26,6 @@ type ProjectService struct { queue port.CommandQueue // Optional command queue webhookDispatcher port.WebhookDispatcher // Optional webhook dispatcher agentRegistry port.CodeAgentRegistry // Optional code agent registry - logger *slog.Logger cmdID atomic.Uint64 } @@ -40,16 +39,9 @@ func NewProjectService( projects: projects, executor: executor, streams: streams, - logger: slog.Default(), } } -// WithLogger sets a custom logger for the service. -func (s *ProjectService) WithLogger(logger *slog.Logger) *ProjectService { - s.logger = logger - return s -} - // WithAuditLogger sets an audit logger for the service. func (s *ProjectService) WithAuditLogger(auditLogger port.AuditLogger) *ProjectService { s.auditLogger = auditLogger @@ -83,15 +75,17 @@ type AuditContext struct { // List returns all available projects with refreshed status. func (s *ProjectService) List(ctx context.Context) ([]domain.Project, error) { + log := logging.FromContext(ctx).WithService("ProjectService") // Refresh status from Kubernetes if err := s.projects.RefreshStatus(ctx); err != nil { - s.logger.Warn("failed to refresh project status", "error", err) + log.Warn("failed to refresh project status", logging.FieldError, err) } return s.projects.List(ctx) } // Get returns a specific project by ID. func (s *ProjectService) Get(ctx context.Context, id domain.ProjectID) (*domain.Project, error) { + log := logging.FromContext(ctx).WithService("ProjectService") project, err := s.projects.Get(ctx, id) if err != nil { return nil, err @@ -99,7 +93,7 @@ func (s *ProjectService) Get(ctx context.Context, id domain.ProjectID) (*domain. // Refresh status if refreshErr := s.projects.RefreshStatus(ctx); refreshErr != nil { - s.logger.Warn("failed to refresh project status", "project", id, "error", refreshErr) + log.Warn("failed to refresh project status", logging.FieldProjectID, id, logging.FieldError, refreshErr) } return project, nil @@ -168,6 +162,7 @@ func (s *ProjectService) ExecuteClaude(ctx context.Context, req ExecuteClaudeReq // Log audit start if audit logger is configured if s.auditLogger != nil && req.Audit != nil { + log := logging.FromContext(ctx).WithService("ProjectService") argsJSON, _ := json.Marshal(cmd.Args) auditEntry := &domain.AuditLogEntry{ ID: uuid.New().String(), @@ -182,7 +177,7 @@ func (s *ProjectService) ExecuteClaude(ctx context.Context, req ExecuteClaudeReq Status: domain.AuditStatusRunning, } if err := s.auditLogger.LogCommandStart(ctx, auditEntry); err != nil { - s.logger.Warn("failed to log audit start", "command_id", cmdID, "error", err) + log.Warn("failed to log audit start", "command_id", cmdID, logging.FieldError, err) } } @@ -222,6 +217,7 @@ func (s *ProjectService) executeCommand(podName string, cmd *domain.Command) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() + log := logging.FromContext(ctx).WithService("ProjectService") streamID := string(cmd.ID) var lastEventID string var outputSizeBytes int64 @@ -283,7 +279,7 @@ func (s *ProjectService) executeCommand(podName string, cmd *domain.Command) { OutputSizeBytes: outputSizeBytes, } if err := s.auditLogger.LogCommandEnd(ctx, string(cmd.ID), auditResult); err != nil { - s.logger.Warn("failed to log audit end", "command_id", cmd.ID, "error", err) + log.Warn("failed to log audit end", "command_id", cmd.ID, logging.FieldError, err) } } @@ -311,10 +307,10 @@ func (s *ProjectService) executeCommand(podName string, cmd *domain.Command) { Error: errorMsg, }) - s.logger.Debug("command completed", + log.Debug("command completed", "command_id", cmd.ID, "exit_code", result.ExitCode, - "duration_ms", result.DurationMs, + logging.FieldDuration, result.DurationMs, "last_event_id", lastEventID, "complete_event_id", eventID, ) @@ -340,10 +336,11 @@ func (s *ProjectService) dispatchWebhookEvent(ctx context.Context, projectID str } if err := s.webhookDispatcher.Dispatch(ctx, projectID, event); err != nil { - s.logger.Warn("failed to dispatch webhook event", - "project_id", projectID, + log := logging.FromContext(ctx).WithService("ProjectService") + log.Warn("failed to dispatch webhook event", + logging.FieldProjectID, projectID, "event_type", eventType, - "error", err, + logging.FieldError, err, ) } } diff --git a/internal/service/project_service_agent.go b/internal/service/project_service_agent.go index f0fa86f..de5017d 100644 --- a/internal/service/project_service_agent.go +++ b/internal/service/project_service_agent.go @@ -7,6 +7,7 @@ import ( "time" "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/port" ) @@ -34,6 +35,7 @@ func (s *ProjectService) executeAgentCommand(agent port.CodeAgent, req *domain.A ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() + log := logging.FromContext(ctx).WithService("ProjectService") streamID := string(cmd.ID) var lastEventID string var outputSizeBytes int64 @@ -140,7 +142,7 @@ func (s *ProjectService) executeAgentCommand(agent port.CodeAgent, req *domain.A OutputSizeBytes: outputSizeBytes, } if err := s.auditLogger.LogCommandEnd(ctx, string(cmd.ID), auditResult); err != nil { - s.logger.Warn("failed to log audit end", "command_id", cmd.ID, "error", err) + log.Warn("failed to log audit end", "command_id", cmd.ID, logging.FieldError, err) } } @@ -168,12 +170,12 @@ func (s *ProjectService) executeAgentCommand(agent port.CodeAgent, req *domain.A Error: errorMsg, }) - s.logger.Debug("agent command completed", + log.Debug("agent command completed", "command_id", cmd.ID, "provider", agent.Provider(), "session_id", result.SessionID, "exit_code", result.ExitCode, - "duration_ms", result.DurationMs, + logging.FieldDuration, result.DurationMs, "last_event_id", lastEventID, "complete_event_id", eventID, ) diff --git a/internal/service/project_service_commands.go b/internal/service/project_service_commands.go index 84f6935..008a740 100644 --- a/internal/service/project_service_commands.go +++ b/internal/service/project_service_commands.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/sanitize" ) @@ -66,6 +67,7 @@ func (s *ProjectService) ExecuteShell(ctx context.Context, req ExecuteShellReque // Log audit start if audit logger is configured if s.auditLogger != nil && req.Audit != nil { + log := logging.FromContext(ctx).WithService("ProjectService") argsJSON, _ := json.Marshal(cmd.Args) auditEntry := &domain.AuditLogEntry{ ID: uuid.New().String(), @@ -80,7 +82,7 @@ func (s *ProjectService) ExecuteShell(ctx context.Context, req ExecuteShellReque Status: domain.AuditStatusRunning, } if err := s.auditLogger.LogCommandStart(ctx, auditEntry); err != nil { - s.logger.Warn("failed to log audit start", "command_id", cmdID, "error", err) + log.Warn("failed to log audit start", "command_id", cmdID, logging.FieldError, err) } } @@ -146,6 +148,7 @@ func (s *ProjectService) ExecuteGit(ctx context.Context, req ExecuteGitRequest) // Log audit start if audit logger is configured if s.auditLogger != nil && req.Audit != nil { + log := logging.FromContext(ctx).WithService("ProjectService") argsJSON, _ := json.Marshal(cmd.Args) auditEntry := &domain.AuditLogEntry{ ID: uuid.New().String(), @@ -160,7 +163,7 @@ func (s *ProjectService) ExecuteGit(ctx context.Context, req ExecuteGitRequest) Status: domain.AuditStatusRunning, } if err := s.auditLogger.LogCommandStart(ctx, auditEntry); err != nil { - s.logger.Warn("failed to log audit start", "command_id", cmdID, "error", err) + log.Warn("failed to log audit start", "command_id", cmdID, logging.FieldError, err) } } diff --git a/internal/worker/build_executor.go b/internal/worker/build_executor.go index d298c4f..7754d6a 100644 --- a/internal/worker/build_executor.go +++ b/internal/worker/build_executor.go @@ -3,11 +3,11 @@ package worker import ( "context" "fmt" - "log/slog" "strings" "time" "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/port" ) @@ -29,9 +29,8 @@ type BuildExecutor struct { agentRegistry port.CodeAgentRegistry podGitOps *PodGitOperations // Post-build git operations (runs in pod) streams port.StreamPublisher // SSE stream publisher for real-time events - logger *slog.Logger - defaultPodName string // Default claudebox pod for agent execution - namespace string // Kubernetes namespace for the pod + defaultPodName string // Default claudebox pod for agent execution + namespace string // Kubernetes namespace for the pod } // BuildExecutorConfig holds configuration for the build executor. @@ -45,12 +44,8 @@ func NewBuildExecutor( agentRegistry port.CodeAgentRegistry, podGitOps *PodGitOperations, streams port.StreamPublisher, - logger *slog.Logger, cfg *BuildExecutorConfig, ) *BuildExecutor { - if logger == nil { - logger = slog.Default() - } if cfg == nil { cfg = &BuildExecutorConfig{ DefaultPodName: "claudebox-0", @@ -61,7 +56,6 @@ func NewBuildExecutor( agentRegistry: agentRegistry, podGitOps: podGitOps, streams: streams, - logger: logger.With("component", "build-executor"), defaultPodName: cfg.DefaultPodName, namespace: cfg.Namespace, } @@ -69,6 +63,7 @@ func NewBuildExecutor( // Execute runs a build task by translating its spec into an agent call. func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult { + log := logging.FromContext(ctx).WithWorker("build-executor") start := time.Now() streamID := task.ID // Use task ID as stream ID for SSE @@ -126,7 +121,7 @@ func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *dom } } - b.logger.Info("ensuring git repository is ready", + log.Info("ensuring git repository is ready", "task_id", task.ID, "pod", podName, "workDir", workDir, @@ -168,7 +163,7 @@ func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *dom const maxOutputSize = 1 << 20 // 1MB var outputBuilder strings.Builder - b.logger.Info("executing build via agent", + log.Info("executing build via agent", "task_id", task.ID, "project_id", task.ProjectID, "agent", agent.Name(), @@ -261,7 +256,7 @@ func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *dom gitResult := b.podGitOps.CommitAndPush(ctx, podName, workDir, commitMsg, spec.AutoPush) if gitResult.Error != nil { - b.logger.Warn("post-build git operations failed", + log.Warn("post-build git operations failed", "task_id", task.ID, "error", gitResult.Error, ) @@ -270,14 +265,14 @@ func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *dom } else if gitResult.HasChanges { result.CommitSHA = gitResult.CommitSHA result.FilesChanged = gitResult.FilesChanged - b.logger.Info("post-build git operations completed", + log.Info("post-build git operations completed", "task_id", task.ID, "commit", gitResult.CommitSHA, "files", len(gitResult.FilesChanged), "pushed", gitResult.Pushed, ) } else { - b.logger.Info("no changes to commit after build", + log.Info("no changes to commit after build", "task_id", task.ID, ) } diff --git a/internal/worker/external_health.go b/internal/worker/external_health.go index e1308af..0c3761a 100644 --- a/internal/worker/external_health.go +++ b/internal/worker/external_health.go @@ -2,11 +2,11 @@ package worker import ( "context" - "log/slog" "sync" "time" "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/port" ) @@ -19,7 +19,6 @@ type ExternalHealthChecker struct { git port.ExternalHealthChecker // gitea interval time.Duration - logger *slog.Logger // Internal state (thread-safe) mu sync.RWMutex @@ -35,14 +34,12 @@ type ExternalHealthChecker struct { type ExternalHealthConfig struct { // CheckInterval is how often to check external systems. Default: 30s. CheckInterval time.Duration - Logger *slog.Logger } // DefaultExternalHealthConfig returns sensible defaults. func DefaultExternalHealthConfig() ExternalHealthConfig { return ExternalHealthConfig{ CheckInterval: 30 * time.Second, - Logger: slog.Default(), } } @@ -57,9 +54,6 @@ func NewExternalHealthChecker( if cfg.CheckInterval == 0 { cfg.CheckInterval = 30 * time.Second } - if cfg.Logger == nil { - cfg.Logger = slog.Default() - } ctx, cancel := context.WithCancel(context.Background()) @@ -68,7 +62,6 @@ func NewExternalHealthChecker( ci: ci, git: git, interval: cfg.CheckInterval, - logger: cfg.Logger.With("component", "external-health"), statuses: make(map[domain.ExternalSystem]domain.ExternalSystemStatus), ctx: ctx, cancel: cancel, @@ -77,7 +70,8 @@ func NewExternalHealthChecker( // Start begins the background check loop. func (c *ExternalHealthChecker) Start() { - c.logger.Info("external health checker started", "interval", c.interval) + log := logging.FromContext(c.ctx).WithWorker("external-health") + log.Info("external health checker started", "interval", c.interval) c.wg.Add(1) go c.checkLoop() @@ -85,10 +79,11 @@ func (c *ExternalHealthChecker) Start() { // Stop gracefully shuts down the checker. func (c *ExternalHealthChecker) Stop() { - c.logger.Info("external health checker stopping") + log := logging.FromContext(c.ctx).WithWorker("external-health") + log.Info("external health checker stopping") c.cancel() c.wg.Wait() - c.logger.Info("external health checker stopped") + log.Info("external health checker stopped") } // GetStatus returns the cached status for a specific system. @@ -198,6 +193,8 @@ func (c *ExternalHealthChecker) runChecks() { // updateStatus updates cached status and logs/metrics on state changes. func (c *ExternalHealthChecker) updateStatus(status domain.ExternalSystemStatus) { + log := logging.FromContext(c.ctx).WithWorker("external-health") + c.mu.Lock() defer c.mu.Unlock() @@ -214,31 +211,31 @@ func (c *ExternalHealthChecker) updateStatus(status domain.ExternalSystemStatus) if !existed { // First check if status.Healthy { - c.logger.Info("external system healthy", + log.Info("external system healthy", "system", status.System, "url", status.URL, "latency", status.Latency, ) } else { - c.logger.Warn("external system unhealthy", + log.Warn("external system unhealthy", "system", status.System, "url", status.URL, - "error", status.Error, + logging.FieldError, status.Error, ) } } else if prev.Healthy != status.Healthy { // State changed if status.Healthy { - c.logger.Info("external system recovered", + log.Info("external system recovered", "system", status.System, "url", status.URL, "latency", status.Latency, ) } else { - c.logger.Warn("external system became unhealthy", + log.Warn("external system became unhealthy", "system", status.System, "url", status.URL, - "error", status.Error, + logging.FieldError, status.Error, ) } } diff --git a/internal/worker/mock_test.go b/internal/worker/mock_test.go index 1bf5737..a5269a1 100644 --- a/internal/worker/mock_test.go +++ b/internal/worker/mock_test.go @@ -374,7 +374,7 @@ func newTestDeps() *testDeps { WithBuildAudit(audit) workSvc := service.NewWorkService(queue) - buildExec := NewBuildExecutor(agentRegistry, nil, nil, nil, nil) + buildExec := NewBuildExecutor(agentRegistry, nil, nil, nil) return &testDeps{ queue: queue, diff --git a/internal/worker/operation_cleanup.go b/internal/worker/operation_cleanup.go index c30eb4e..2041da5 100644 --- a/internal/worker/operation_cleanup.go +++ b/internal/worker/operation_cleanup.go @@ -2,10 +2,10 @@ package worker import ( "context" - "log/slog" "sync" "time" + "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/port" ) @@ -13,7 +13,6 @@ import ( // Operations older than the retention period (default 30 days) are deleted. type OperationCleanup struct { repo port.OperationRepository - logger *slog.Logger retentionPeriod time.Duration cleanupInterval time.Duration @@ -31,8 +30,6 @@ type OperationCleanupConfig struct { // CleanupInterval is how often to run cleanup. // Default: 1 hour. CleanupInterval time.Duration - - Logger *slog.Logger } // DefaultOperationCleanupConfig returns sensible defaults. @@ -40,7 +37,6 @@ func DefaultOperationCleanupConfig() *OperationCleanupConfig { return &OperationCleanupConfig{ RetentionPeriod: 30 * 24 * time.Hour, // 30 days CleanupInterval: 1 * time.Hour, - Logger: slog.Default(), } } @@ -54,7 +50,6 @@ func NewOperationCleanup(repo port.OperationRepository, cfg *OperationCleanupCon return &OperationCleanup{ repo: repo, - logger: cfg.Logger.With("component", "operation-cleanup"), retentionPeriod: cfg.RetentionPeriod, cleanupInterval: cfg.CleanupInterval, ctx: ctx, @@ -64,7 +59,8 @@ func NewOperationCleanup(repo port.OperationRepository, cfg *OperationCleanupCon // Start begins the cleanup loop. func (c *OperationCleanup) Start() { - c.logger.Info("operation cleanup started", + log := logging.FromContext(c.ctx).WithWorker("operation-cleanup") + log.Info("operation cleanup started", "retention_period", c.retentionPeriod, "cleanup_interval", c.cleanupInterval, ) @@ -75,10 +71,11 @@ func (c *OperationCleanup) Start() { // Stop gracefully shuts down the cleanup worker. func (c *OperationCleanup) Stop() { - c.logger.Info("operation cleanup stopping") + log := logging.FromContext(c.ctx).WithWorker("operation-cleanup") + log.Info("operation cleanup stopping") c.cancel() c.wg.Wait() - c.logger.Info("operation cleanup stopped") + log.Info("operation cleanup stopped") } // cleanupLoop runs periodic cleanup. @@ -106,18 +103,20 @@ func (c *OperationCleanup) runCleanup() { ctx, cancel := context.WithTimeout(c.ctx, TimeoutMaintenance) defer cancel() + log := logging.FromContext(ctx).WithWorker("operation-cleanup") + cutoff := time.Now().Add(-c.retentionPeriod) deleted, err := c.repo.DeleteOlderThan(ctx, cutoff) if err != nil { - c.logger.Error("failed to cleanup old operations", - "error", err, + log.Error("failed to cleanup old operations", + logging.FieldError, err, "cutoff", cutoff, ) return } if deleted > 0 { - c.logger.Info("cleaned up old operations", + log.Info("cleaned up old operations", "deleted", deleted, "cutoff", cutoff, ) diff --git a/internal/worker/pod_git_operations.go b/internal/worker/pod_git_operations.go index a24c1bf..fc26167 100644 --- a/internal/worker/pod_git_operations.go +++ b/internal/worker/pod_git_operations.go @@ -4,9 +4,10 @@ import ( "bytes" "context" "fmt" - "log/slog" "os/exec" "strings" + + "github.com/orchard9/rdev/internal/logging" ) // PodGitOperations provides git operations that run inside a Kubernetes pod @@ -17,7 +18,6 @@ type PodGitOperations struct { giteaToken string gitUser string gitEmail string - logger *slog.Logger } // PodGitOperationsConfig configures pod git operations. @@ -33,8 +33,6 @@ type PodGitOperationsConfig struct { // GitEmail is the git commit author email. GitEmail string - - Logger *slog.Logger } // NewPodGitOperations creates a new pod git operations helper. @@ -45,15 +43,11 @@ func NewPodGitOperations(cfg PodGitOperationsConfig) *PodGitOperations { if cfg.GitEmail == "" { cfg.GitEmail = "worker@threesix.ai" } - if cfg.Logger == nil { - cfg.Logger = slog.Default() - } return &PodGitOperations{ namespace: cfg.Namespace, giteaToken: cfg.GiteaToken, gitUser: cfg.GitUser, gitEmail: cfg.GitEmail, - logger: cfg.Logger.With("component", "pod-git-ops"), } } @@ -87,6 +81,7 @@ func (g *PodGitOperations) IsGitRepo(ctx context.Context, podName, workDir strin // If the workspace already contains a git repo, it pulls the latest changes instead. // If the workspace exists but is not a git repo, it clears the directory first. func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, cloneURL string) *CloneResult { + log := logging.FromContext(ctx).WithWorker("pod-git-ops") result := &CloneResult{} if cloneURL == "" { @@ -103,24 +98,24 @@ func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, clon // Normalize URLs for comparison (both should be HTTPS) if err == nil && currentRemote == expectedURL { - g.logger.Info("workspace is already a git repo with correct remote, pulling latest", - "pod", podName, + log.Info("workspace is already a git repo with correct remote, pulling latest", + logging.FieldPodName, podName, "workDir", workDir, ) // Pull latest changes if err := g.runGitInPod(ctx, podName, workDir, "pull", "--ff-only"); err != nil { // Pull failed, but repo exists - not fatal, might have local changes - g.logger.Warn("git pull failed, continuing with existing state", - "pod", podName, - "error", err, + log.Warn("git pull failed, continuing with existing state", + logging.FieldPodName, podName, + logging.FieldError, err, ) } return result } // Remote doesn't match - this is a different project's repo - g.logger.Info("workspace has different git remote, will re-clone", - "pod", podName, + log.Info("workspace has different git remote, will re-clone", + logging.FieldPodName, podName, "workDir", workDir, "currentRemote", currentRemote, "expectedURL", expectedURL, @@ -129,8 +124,8 @@ func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, clon // Check if directory exists but is not a git repo - clear it first if g.dirExists(ctx, podName, workDir) { - g.logger.Info("workspace exists but is not a git repo, clearing", - "pod", podName, + log.Info("workspace exists but is not a git repo, clearing", + logging.FieldPodName, podName, "workDir", workDir, ) // Clear the directory contents (but keep the directory itself) @@ -140,9 +135,9 @@ func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, clon } cmd := exec.CommandContext(ctx, "kubectl", clearArgs...) if err := cmd.Run(); err != nil { - g.logger.Warn("failed to clear workspace, attempting clone anyway", - "pod", podName, - "error", err, + log.Warn("failed to clear workspace, attempting clone anyway", + logging.FieldPodName, podName, + logging.FieldError, err, ) } } @@ -155,8 +150,8 @@ func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, clon authCloneURL = strings.Replace(cloneURL, "https://", "https://token:"+g.giteaToken+"@", 1) } - g.logger.Info("cloning repository", - "pod", podName, + log.Info("cloning repository", + logging.FieldPodName, podName, "workDir", workDir, "url", cloneURL, // Log without token ) @@ -178,8 +173,8 @@ func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, clon } result.Cloned = true - g.logger.Info("repository cloned successfully", - "pod", podName, + log.Info("repository cloned successfully", + logging.FieldPodName, podName, "workDir", workDir, ) @@ -205,6 +200,7 @@ func (g *PodGitOperations) dirExists(ctx context.Context, podName, path string) // // This is the programmatic alternative to relying on LLMs for git operations. func (g *PodGitOperations) CommitAndPush(ctx context.Context, podName, workDir, message string, push bool) *PostBuildResult { + log := logging.FromContext(ctx).WithWorker("pod-git-ops") result := &PostBuildResult{} // Configure git user for commits @@ -224,7 +220,7 @@ func (g *PodGitOperations) CommitAndPush(ctx context.Context, podName, workDir, return result } if strings.TrimSpace(status) == "" { - g.logger.Info("no changes to commit", "pod", podName, "workDir", workDir) + log.Info("no changes to commit", logging.FieldPodName, podName, "workDir", workDir) return result } result.HasChanges = true @@ -261,8 +257,8 @@ func (g *PodGitOperations) CommitAndPush(ctx context.Context, podName, workDir, } result.CommitSHA = strings.TrimSpace(sha) - g.logger.Info("committed changes", - "pod", podName, + log.Info("committed changes", + logging.FieldPodName, podName, "sha", result.CommitSHA, "files", len(result.FilesChanged), ) @@ -275,7 +271,7 @@ func (g *PodGitOperations) CommitAndPush(ctx context.Context, podName, workDir, // This avoids putting the token in the URL which would be visible in logs credHelper := fmt.Sprintf("!f() { echo username=token; echo password=%s; }; f", g.giteaToken) if err := g.runGitInPod(ctx, podName, workDir, "config", "credential.helper", credHelper); err != nil { - g.logger.Warn("failed to configure credential helper", "error", err) + log.Warn("failed to configure credential helper", logging.FieldError, err) // Continue anyway - push might still work if pod has other auth configured } } @@ -285,7 +281,7 @@ func (g *PodGitOperations) CommitAndPush(ctx context.Context, podName, workDir, return result } result.Pushed = true - g.logger.Info("pushed changes", "pod", podName, "sha", result.CommitSHA) + log.Info("pushed changes", logging.FieldPodName, podName, "sha", result.CommitSHA) } return result diff --git a/internal/worker/queue_maintenance.go b/internal/worker/queue_maintenance.go index 0639d36..c0c52af 100644 --- a/internal/worker/queue_maintenance.go +++ b/internal/worker/queue_maintenance.go @@ -2,11 +2,11 @@ package worker import ( "context" - "log/slog" "sync" "time" "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/port" ) @@ -18,7 +18,6 @@ type QueueMaintenance struct { queue port.WorkQueue registry port.WorkerRegistry buildAudit port.BuildAudit // Optional: syncs build audit when requeuing stale tasks - logger *slog.Logger // Intervals staleTaskTimeout time.Duration @@ -57,8 +56,6 @@ type QueueMaintenanceConfig struct { // BuildAudit syncs build audit status when requeuing stale tasks. // If nil, build audit is not updated (legacy behavior). BuildAudit port.BuildAudit - - Logger *slog.Logger } // DefaultQueueMaintenanceConfig returns sensible defaults. @@ -69,7 +66,6 @@ func DefaultQueueMaintenanceConfig() *QueueMaintenanceConfig { CleanupAge: 7 * 24 * time.Hour, MaintenancePeriod: 1 * time.Minute, MetricsPeriod: 15 * time.Second, - Logger: slog.Default(), } } @@ -89,7 +85,6 @@ func NewQueueMaintenance( queue: queue, registry: registry, buildAudit: cfg.BuildAudit, - logger: cfg.Logger.With("component", "queue-maintenance"), staleTaskTimeout: cfg.StaleTaskTimeout, staleWorkerTimeout: cfg.StaleWorkerTimeout, cleanupAge: cfg.CleanupAge, @@ -102,7 +97,8 @@ func NewQueueMaintenance( // Start begins the maintenance and metrics loops. func (m *QueueMaintenance) Start() { - m.logger.Info("queue maintenance started", + log := logging.FromContext(m.ctx).WithWorker("queue-maintenance") + log.Info("queue maintenance started", "maintenance_period", m.maintenancePeriod, "metrics_period", m.metricsPeriod, "stale_task_timeout", m.staleTaskTimeout, @@ -117,10 +113,11 @@ func (m *QueueMaintenance) Start() { // Stop gracefully shuts down the maintenance worker. func (m *QueueMaintenance) Stop() { - m.logger.Info("queue maintenance stopping") + log := logging.FromContext(m.ctx).WithWorker("queue-maintenance") + log.Info("queue maintenance stopping") m.cancel() m.wg.Wait() - m.logger.Info("queue maintenance stopped") + log.Info("queue maintenance stopped") } // maintenanceLoop runs periodic maintenance: stale recovery, worker health, cleanup. @@ -177,26 +174,28 @@ func (m *QueueMaintenance) runMaintenance() { // (the worker likely crashed without reporting). // Also syncs build audit to pending status if configured. func (m *QueueMaintenance) requeueStaleTasks(ctx context.Context) { + log := logging.FromContext(ctx).WithWorker("queue-maintenance") + // Use RequeueStaleWithIDs to get task IDs for build audit sync taskIDs, err := m.queue.RequeueStaleWithIDs(ctx, m.staleTaskTimeout) if err != nil { - m.logger.Warn("failed to requeue stale tasks", "error", err) + log.Warn("failed to requeue stale tasks", logging.FieldError, err) return } if len(taskIDs) == 0 { return } - m.logger.Info("requeued stale tasks", "count", len(taskIDs), "task_ids", taskIDs) + log.Info("requeued stale tasks", "count", len(taskIDs), "task_ids", taskIDs) // Sync build audit status if configured if m.buildAudit != nil { for _, taskID := range taskIDs { // Update build audit to pending (worker assignment cleared) if err := m.buildAudit.UpdateStatus(ctx, taskID, domain.BuildStatusPending, ""); err != nil { - m.logger.Warn("failed to sync build audit for requeued task", + log.Warn("failed to sync build audit for requeued task", "task_id", taskID, - "error", err, + logging.FieldError, err, ) } } @@ -205,25 +204,29 @@ func (m *QueueMaintenance) requeueStaleTasks(ctx context.Context) { // markStaleWorkers marks workers without recent heartbeats as offline. func (m *QueueMaintenance) markStaleWorkers(ctx context.Context) { + log := logging.FromContext(ctx).WithWorker("queue-maintenance") + count, err := m.registry.MarkStaleOffline(ctx, m.staleWorkerTimeout) if err != nil { - m.logger.Warn("failed to mark stale workers offline", "error", err) + log.Warn("failed to mark stale workers offline", logging.FieldError, err) return } if count > 0 { - m.logger.Info("marked stale workers offline", "count", count) + log.Info("marked stale workers offline", "count", count) } } // cleanupOldTasks removes completed/failed/cancelled tasks older than cleanup age. func (m *QueueMaintenance) cleanupOldTasks(ctx context.Context) { + log := logging.FromContext(ctx).WithWorker("queue-maintenance") + count, err := m.queue.CleanupOld(ctx, m.cleanupAge) if err != nil { - m.logger.Warn("failed to cleanup old tasks", "error", err) + log.Warn("failed to cleanup old tasks", logging.FieldError, err) return } if count > 0 { - m.logger.Info("cleaned up old tasks", "count", count) + log.Info("cleaned up old tasks", "count", count) } } @@ -232,9 +235,11 @@ func (m *QueueMaintenance) refreshMetrics() { ctx, cancel := context.WithTimeout(m.ctx, TimeoutQuickOp) defer cancel() + log := logging.FromContext(ctx).WithWorker("queue-maintenance") + stats, err := m.queue.GetStats(ctx) if err != nil { - m.logger.Warn("failed to get queue stats for metrics", "error", err) + log.Warn("failed to get queue stats for metrics", logging.FieldError, err) return } @@ -247,7 +252,7 @@ func (m *QueueMaintenance) refreshMetrics() { // Worker counts workers, err := m.registry.List(ctx, port.WorkerFilter{}) if err != nil { - m.logger.Warn("failed to list workers for metrics", "error", err) + log.Warn("failed to list workers for metrics", logging.FieldError, err) return } diff --git a/internal/worker/queue_maintenance_test.go b/internal/worker/queue_maintenance_test.go index 774994f..cd6ce3e 100644 --- a/internal/worker/queue_maintenance_test.go +++ b/internal/worker/queue_maintenance_test.go @@ -3,7 +3,6 @@ package worker import ( "context" "fmt" - "log/slog" "sync" "testing" "time" @@ -212,7 +211,6 @@ func TestQueueMaintenance_RunMaintenance(t *testing.T) { CleanupAge: 7 * 24 * time.Hour, MaintenancePeriod: 1 * time.Hour, // won't fire in test MetricsPeriod: 1 * time.Hour, // won't fire in test - Logger: slog.Default(), } m := NewQueueMaintenance(queue, registry, cfg) @@ -246,7 +244,6 @@ func TestQueueMaintenance_RefreshMetrics(t *testing.T) { CleanupAge: 7 * 24 * time.Hour, MaintenancePeriod: 1 * time.Hour, MetricsPeriod: 1 * time.Hour, - Logger: slog.Default(), } m := NewQueueMaintenance(queue, registry, cfg) @@ -271,7 +268,6 @@ func TestQueueMaintenance_StartStop(t *testing.T) { CleanupAge: 7 * 24 * time.Hour, MaintenancePeriod: 50 * time.Millisecond, MetricsPeriod: 50 * time.Millisecond, - Logger: slog.Default(), } m := NewQueueMaintenance(queue, registry, cfg) diff --git a/internal/worker/queue_processor.go b/internal/worker/queue_processor.go index 0c36e6b..3e97436 100644 --- a/internal/worker/queue_processor.go +++ b/internal/worker/queue_processor.go @@ -4,12 +4,12 @@ package worker import ( "context" "encoding/json" - "log/slog" "strings" "sync" "time" "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/port" ) @@ -20,7 +20,6 @@ type QueueProcessor struct { projects port.ProjectRepository streams port.StreamPublisher webhookDispatcher port.WebhookDispatcher - logger *slog.Logger pollPeriod time.Duration // Shutdown management @@ -36,14 +35,12 @@ type QueueProcessor struct { // QueueProcessorConfig holds configuration for the queue processor. type QueueProcessorConfig struct { PollPeriod time.Duration - Logger *slog.Logger } // DefaultQueueProcessorConfig returns sensible defaults. func DefaultQueueProcessorConfig() *QueueProcessorConfig { return &QueueProcessorConfig{ PollPeriod: 5 * time.Second, - Logger: slog.Default(), } } @@ -66,7 +63,6 @@ func NewQueueProcessor( executor: executor, projects: projects, streams: streams, - logger: cfg.Logger, pollPeriod: cfg.PollPeriod, ctx: ctx, cancel: cancel, @@ -83,7 +79,8 @@ func (p *QueueProcessor) WithWebhookDispatcher(dispatcher port.WebhookDispatcher // Start begins processing the command queue. // It spawns a worker for each known project. func (p *QueueProcessor) Start() error { - p.logger.Info("queue processor starting") + log := logging.FromContext(p.ctx).WithWorker("queue-processor") + log.Info("queue processor starting") // Start the main coordinator that manages per-project workers p.wg.Add(1) @@ -94,16 +91,18 @@ func (p *QueueProcessor) Start() error { // Stop gracefully shuts down the queue processor. func (p *QueueProcessor) Stop() { - p.logger.Info("queue processor stopping") + log := logging.FromContext(p.ctx).WithWorker("queue-processor") + log.Info("queue processor stopping") p.cancel() p.wg.Wait() - p.logger.Info("queue processor stopped") + log.Info("queue processor stopped") } // coordinator manages per-project workers, starting new ones as projects are discovered. func (p *QueueProcessor) coordinator() { defer p.wg.Done() + log := logging.FromContext(p.ctx).WithWorker("queue-processor") ticker := time.NewTicker(p.pollPeriod) defer ticker.Stop() @@ -116,7 +115,7 @@ func (p *QueueProcessor) coordinator() { // Stop all project workers p.projectMu.Lock() for projectID, cancel := range p.projectWorkers { - p.logger.Debug("stopping worker", "project", projectID) + log.Debug("stopping worker", "project", projectID) cancel() } p.projectMu.Unlock() @@ -129,9 +128,11 @@ func (p *QueueProcessor) coordinator() { // refreshProjectWorkers ensures each known project has a worker. func (p *QueueProcessor) refreshProjectWorkers() { + log := logging.FromContext(p.ctx).WithWorker("queue-processor") + projects, err := p.projects.List(p.ctx) if err != nil { - p.logger.Warn("failed to list projects for queue processing", "error", err) + log.Warn("failed to list projects for queue processing", logging.FieldError, err) return } @@ -146,7 +147,7 @@ func (p *QueueProcessor) refreshProjectWorkers() { p.projectWorkers[projectID] = workerCancel p.wg.Add(1) go p.projectWorker(workerCtx, projectID) - p.logger.Info("started queue worker", "project", projectID) + log.Info("started queue worker", "project", projectID) } } @@ -168,7 +169,7 @@ func (p *QueueProcessor) projectWorker(ctx context.Context, projectID string) { case <-ticker.C: // Try to dequeue and process a command if err := p.processNextCommand(ctx, projectID); err != nil { - p.logger.Warn("error processing command", "project", projectID, "error", err) + logging.FromContext(ctx).WithWorker("queue-processor").Warn("error processing command", "project", projectID, "error", err) } } } @@ -185,7 +186,7 @@ func (p *QueueProcessor) processNextCommand(ctx context.Context, projectID strin return nil // No commands pending } - p.logger.Info("processing queued command", + logging.FromContext(ctx).WithWorker("queue-processor").Info("processing queued command", "command_id", cmd.ID, "project", projectID, "type", cmd.CommandType, @@ -294,7 +295,7 @@ func (p *QueueProcessor) processNextCommand(ctx context.Context, projectID strin // Update command status if err := p.queue.UpdateStatus(ctx, cmd.ID, finalStatus, queueResult); err != nil { - p.logger.Warn("failed to update command status", "command_id", cmd.ID, "error", err) + logging.FromContext(ctx).WithWorker("queue-processor").Warn("failed to update command status", "command_id", cmd.ID, "error", err) } // Publish completion event @@ -327,7 +328,7 @@ func (p *QueueProcessor) processNextCommand(ctx context.Context, projectID strin Error: queueResult.Error, }) - p.logger.Info("completed queued command", + logging.FromContext(ctx).WithWorker("queue-processor").Info("completed queued command", "command_id", cmd.ID, "project", projectID, "status", finalStatus, @@ -357,7 +358,7 @@ func (p *QueueProcessor) dispatchWebhookEvent(ctx context.Context, projectID str } if err := p.webhookDispatcher.Dispatch(ctx, projectID, event); err != nil { - p.logger.Warn("failed to dispatch webhook event", + logging.FromContext(ctx).WithWorker("queue-processor").Warn("failed to dispatch webhook event", "project_id", projectID, "event_type", eventType, "error", err, diff --git a/internal/worker/sdlc_executor.go b/internal/worker/sdlc_executor.go index e90eb33..05eec6b 100644 --- a/internal/worker/sdlc_executor.go +++ b/internal/worker/sdlc_executor.go @@ -5,12 +5,12 @@ import ( "context" "encoding/json" "fmt" - "log/slog" "os/exec" "strings" "time" "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/logging" ) // SDLCTaskExecutor handles WorkTaskTypeSDLC tasks by executing sdlc CLI commands @@ -19,7 +19,6 @@ import ( type SDLCTaskExecutor struct { podGitOps *PodGitOperations namespace string - logger *slog.Logger } // SDLCTaskExecutorConfig holds configuration for the SDLC task executor. @@ -29,20 +28,13 @@ type SDLCTaskExecutorConfig struct { // PodGitOps provides git clone/commit/push operations. PodGitOps *PodGitOperations - - Logger *slog.Logger } // NewSDLCTaskExecutor creates a new SDLC task executor. func NewSDLCTaskExecutor(cfg SDLCTaskExecutorConfig) *SDLCTaskExecutor { - logger := cfg.Logger - if logger == nil { - logger = slog.Default() - } return &SDLCTaskExecutor{ podGitOps: cfg.PodGitOps, namespace: cfg.Namespace, - logger: logger.With("component", "sdlc-task-executor"), } } @@ -50,6 +42,7 @@ func NewSDLCTaskExecutor(cfg SDLCTaskExecutorConfig) *SDLCTaskExecutor { // and optionally committing/pushing changes. func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult { start := time.Now() + log := logging.FromContext(ctx).WithWorker("sdlc-task-executor") spec, err := e.parseSpec(task.Spec) if err != nil { @@ -66,11 +59,11 @@ func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) * // Working directory in the pod workDir := "/workspace" - e.logger.Info("executing SDLC task", + log.Info("executing SDLC task", "task_id", task.ID, - "project_id", task.ProjectID, + logging.FieldProjectID, task.ProjectID, "command", spec.Command, - "pod", podName, + logging.FieldPodName, podName, ) // 1. Clone repo to worker pod @@ -93,9 +86,9 @@ func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) * // 2. Ensure .sdlc/ is initialized (auto-init for skeleton projects) if err := e.ensureSDLCInit(ctx, podName, workDir); err != nil { - e.logger.Warn("sdlc init check failed, continuing anyway", + log.Warn("sdlc init check failed, continuing anyway", "task_id", task.ID, - "error", err, + logging.FieldError, err, ) } @@ -128,7 +121,7 @@ func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) * if gitResult.HasChanges { result.CommitSHA = gitResult.CommitSHA result.FilesChanged = gitResult.FilesChanged - e.logger.Info("SDLC changes committed", + log.Info("SDLC changes committed", "task_id", task.ID, "commit", gitResult.CommitSHA, "files", len(gitResult.FilesChanged), @@ -137,10 +130,10 @@ func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) * } } - e.logger.Info("SDLC task completed", + log.Info("SDLC task completed", "task_id", task.ID, "command", spec.Command, - "duration_ms", result.DurationMs, + logging.FieldDuration, result.DurationMs, ) return result @@ -149,6 +142,8 @@ func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) * // ensureSDLCInit checks if .sdlc/ exists and runs `sdlc init` if it doesn't. // This enables SDLC operations on skeleton projects that don't have .sdlc/ pre-initialized. func (e *SDLCTaskExecutor) ensureSDLCInit(ctx context.Context, podName, workDir string) error { + log := logging.FromContext(ctx).WithWorker("sdlc-task-executor") + // Check if .sdlc/ directory exists checkArgs := []string{ "exec", "-n", e.namespace, podName, "--", @@ -169,7 +164,7 @@ func (e *SDLCTaskExecutor) ensureSDLCInit(ctx context.Context, podName, workDir } // Run sdlc init - e.logger.Info("initializing .sdlc directory", "pod", podName, "workDir", workDir) + log.Info("initializing .sdlc directory", logging.FieldPodName, podName, "workDir", workDir) initArgs := []string{ "exec", "-n", e.namespace, podName, "--", @@ -186,7 +181,7 @@ func (e *SDLCTaskExecutor) ensureSDLCInit(ctx context.Context, podName, workDir return fmt.Errorf("sdlc init failed: %w: %s", err, initStderr.String()) } - e.logger.Info("sdlc initialized", "pod", podName, "output", initStdout.String()) + log.Info("sdlc initialized", logging.FieldPodName, podName, "output", initStdout.String()) return nil } diff --git a/internal/worker/verify_executor.go b/internal/worker/verify_executor.go index 12901cf..422310c 100644 --- a/internal/worker/verify_executor.go +++ b/internal/worker/verify_executor.go @@ -4,11 +4,11 @@ import ( "context" "encoding/json" "fmt" - "log/slog" "strings" "time" "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/port" ) @@ -28,9 +28,8 @@ const ( type VerifyExecutor struct { cmdExecutor port.CommandExecutor // kubectl exec wrapper streams port.StreamPublisher // SSE stream publisher for real-time events - logger *slog.Logger - namespace string // Kubernetes namespace for the pod - podName string // Playwright pod name (e.g., "playwright-0") + namespace string // Kubernetes namespace for the pod + podName string // Playwright pod name (e.g., "playwright-0") } // VerifyExecutorConfig holds configuration for the verify executor. @@ -43,12 +42,8 @@ type VerifyExecutorConfig struct { func NewVerifyExecutor( cmdExecutor port.CommandExecutor, streams port.StreamPublisher, - logger *slog.Logger, cfg *VerifyExecutorConfig, ) *VerifyExecutor { - if logger == nil { - logger = slog.Default() - } if cfg == nil { cfg = &VerifyExecutorConfig{ Namespace: "rdev", @@ -64,7 +59,6 @@ func NewVerifyExecutor( return &VerifyExecutor{ cmdExecutor: cmdExecutor, streams: streams, - logger: logger.With("component", "verify-executor"), namespace: cfg.Namespace, podName: cfg.PodName, } @@ -72,6 +66,7 @@ func NewVerifyExecutor( // Execute runs a verify task by capturing screenshots/video of a URL. func (v *VerifyExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult { + log := logging.FromContext(ctx).WithWorker("verify-executor") start := time.Now() streamID := task.ID // Use task ID as stream ID for SSE @@ -109,7 +104,7 @@ func (v *VerifyExecutor) Execute(ctx context.Context, task *domain.WorkTask) *do "viewports": spec.Viewports, }) - v.logger.Info("executing verify capture", + log.Info("executing verify capture", "task_id", task.ID, "project_id", task.ProjectID, "url", spec.URL, diff --git a/internal/worker/verify_executor_test.go b/internal/worker/verify_executor_test.go index 2c793cf..a236c6c 100644 --- a/internal/worker/verify_executor_test.go +++ b/internal/worker/verify_executor_test.go @@ -16,7 +16,7 @@ func TestVerifyExecutor_Execute_Success(t *testing.T) { {Stream: "stdout", Line: `{"screenshots":{"1920x1080":"/captures/task-1/1920_1080.png","375x667":"/captures/task-1/375_667.png"},"video":"/captures/task-1/recording.webm"}`, Timestamp: time.Now()}, } - exec := NewVerifyExecutor(cmdExec, nil, nil, nil) + exec := NewVerifyExecutor(cmdExec, nil, nil) task := &domain.WorkTask{ ID: "task-1", @@ -51,7 +51,7 @@ func TestVerifyExecutor_Execute_Success(t *testing.T) { func TestVerifyExecutor_Execute_URLRequired(t *testing.T) { cmdExec := newMockCommandExecutor() - exec := NewVerifyExecutor(cmdExec, nil, nil, nil) + exec := NewVerifyExecutor(cmdExec, nil, nil) task := &domain.WorkTask{ ID: "task-1", @@ -72,7 +72,7 @@ func TestVerifyExecutor_Execute_URLRequired(t *testing.T) { func TestVerifyExecutor_Execute_InvalidURL(t *testing.T) { cmdExec := newMockCommandExecutor() - exec := NewVerifyExecutor(cmdExec, nil, nil, nil) + exec := NewVerifyExecutor(cmdExec, nil, nil) task := &domain.WorkTask{ ID: "task-1", @@ -94,7 +94,7 @@ func TestVerifyExecutor_Execute_CaptureFailure(t *testing.T) { cmdExec := newMockCommandExecutor() cmdExec.err = fmt.Errorf("kubectl exec failed: connection refused") - exec := NewVerifyExecutor(cmdExec, nil, nil, nil) + exec := NewVerifyExecutor(cmdExec, nil, nil) task := &domain.WorkTask{ ID: "task-1", @@ -122,7 +122,7 @@ func TestVerifyExecutor_Execute_NonZeroExitCode(t *testing.T) { DurationMs: 100, } - exec := NewVerifyExecutor(cmdExec, nil, nil, nil) + exec := NewVerifyExecutor(cmdExec, nil, nil) task := &domain.WorkTask{ ID: "task-1", @@ -146,7 +146,7 @@ func TestVerifyExecutor_Execute_InvalidManifestJSON(t *testing.T) { {Stream: "stdout", Line: "not valid json", Timestamp: time.Now()}, } - exec := NewVerifyExecutor(cmdExec, nil, nil, nil) + exec := NewVerifyExecutor(cmdExec, nil, nil) task := &domain.WorkTask{ ID: "task-1", @@ -165,7 +165,7 @@ func TestVerifyExecutor_Execute_InvalidManifestJSON(t *testing.T) { } func TestVerifyExecutor_ParseSpec(t *testing.T) { - exec := NewVerifyExecutor(nil, nil, nil, nil) + exec := NewVerifyExecutor(nil, nil, nil) t.Run("valid spec with all fields", func(t *testing.T) { spec, err := exec.parseSpec(map[string]any{ @@ -238,7 +238,7 @@ func TestVerifyExecutor_ParseSpec(t *testing.T) { } func TestVerifyExecutor_BuildCaptureCommand(t *testing.T) { - exec := NewVerifyExecutor(nil, nil, nil, nil) + exec := NewVerifyExecutor(nil, nil, nil) spec := &domain.VerifySpec{ URL: "https://example.com/page", @@ -309,7 +309,7 @@ func TestVerifyExecutor_BuildCaptureCommand(t *testing.T) { func TestVerifyExecutor_Config(t *testing.T) { t.Run("default config", func(t *testing.T) { - exec := NewVerifyExecutor(nil, nil, nil, nil) + exec := NewVerifyExecutor(nil, nil, nil) if exec.namespace != "rdev" { t.Errorf("namespace = %q, want 'rdev'", exec.namespace) } @@ -319,7 +319,7 @@ func TestVerifyExecutor_Config(t *testing.T) { }) t.Run("custom config", func(t *testing.T) { - exec := NewVerifyExecutor(nil, nil, nil, &VerifyExecutorConfig{ + exec := NewVerifyExecutor(nil, nil, &VerifyExecutorConfig{ Namespace: "custom-ns", PodName: "custom-pod-0", }) diff --git a/internal/worker/work_executor.go b/internal/worker/work_executor.go index 405a4a8..492e427 100644 --- a/internal/worker/work_executor.go +++ b/internal/worker/work_executor.go @@ -4,13 +4,13 @@ package worker import ( "context" "fmt" - "log/slog" "os" "sync" "sync/atomic" "time" "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/service" ) @@ -24,7 +24,6 @@ type WorkExecutor struct { buildExec *BuildExecutor verifyExec *VerifyExecutor sdlcExec *SDLCTaskExecutor - logger *slog.Logger workerID string hostname string @@ -61,8 +60,6 @@ type WorkExecutorConfig struct { // TaskTimeout is the maximum time a single task may run. // Default: 15 minutes. TaskTimeout time.Duration - - Logger *slog.Logger } // DefaultWorkExecutorConfig returns sensible defaults. @@ -77,7 +74,6 @@ func DefaultWorkExecutorConfig() *WorkExecutorConfig { PollPeriod: 5 * time.Second, HeartbeatPeriod: 30 * time.Second, TaskTimeout: 15 * time.Minute, - Logger: slog.Default(), } } @@ -117,7 +113,6 @@ func NewWorkExecutor( buildExec: buildExec, verifyExec: verifyExec, sdlcExec: sdlcExec, - logger: cfg.Logger.With("component", "work-executor"), workerID: cfg.WorkerID, hostname: hostname, version: cfg.Version, @@ -136,6 +131,8 @@ func (e *WorkExecutor) Start() error { return fmt.Errorf("executor already started") } + log := logging.FromContext(e.ctx).WithWorker("work-executor") + // Register this worker in the pool worker := &domain.Worker{ ID: e.workerID, @@ -147,7 +144,7 @@ func (e *WorkExecutor) Start() error { return err } - e.logger.Info("work executor started", + log.Info("work executor started", "worker_id", e.workerID, "poll_period", e.pollPeriod, "heartbeat_period", e.hbPeriod, @@ -166,7 +163,8 @@ func (e *WorkExecutor) Start() error { // Stop gracefully shuts down the executor. func (e *WorkExecutor) Stop() { - e.logger.Info("work executor stopping", "worker_id", e.workerID) + log := logging.FromContext(e.ctx).WithWorker("work-executor") + log.Info("work executor stopping", "worker_id", e.workerID) e.cancel() e.wg.Wait() @@ -174,10 +172,10 @@ func (e *WorkExecutor) Stop() { ctx, cancel := context.WithTimeout(context.Background(), TimeoutQuickOp) defer cancel() if err := e.workerSvc.Deregister(ctx, e.workerID); err != nil { - e.logger.Warn("failed to deregister worker", "error", err) + logging.FromContext(ctx).WithWorker("work-executor").Warn("failed to deregister worker", "error", err) } - e.logger.Info("work executor stopped", "worker_id", e.workerID) + log.Info("work executor stopped", "worker_id", e.workerID) } // WorkerID returns the executor's worker ID. @@ -194,6 +192,7 @@ func (e *WorkExecutor) Running() bool { func (e *WorkExecutor) heartbeatLoop() { defer e.wg.Done() + log := logging.FromContext(e.ctx).WithWorker("work-executor") ticker := time.NewTicker(e.hbPeriod) defer ticker.Stop() @@ -203,7 +202,7 @@ func (e *WorkExecutor) heartbeatLoop() { return case <-ticker.C: if err := e.workerSvc.Heartbeat(e.ctx, e.workerID); err != nil { - e.logger.Warn("heartbeat failed", "error", err) + log.Warn("heartbeat failed", "error", err) } } } @@ -228,16 +227,18 @@ func (e *WorkExecutor) pollLoop() { // tryClaimAndExecute attempts to claim a task and execute it. func (e *WorkExecutor) tryClaimAndExecute() { + log := logging.FromContext(e.ctx).WithWorker("work-executor") + task, err := e.workerSvc.ClaimTask(e.ctx, e.workerID) if err != nil { - e.logger.Warn("failed to claim task", "error", err) + log.Warn("failed to claim task", "error", err) return } if task == nil { return // No tasks available } - e.logger.Info("executing task", + log.Info("executing task", "task_id", task.ID, "project_id", task.ProjectID, "type", task.Type, @@ -257,7 +258,7 @@ func (e *WorkExecutor) tryClaimAndExecute() { if result.Success { if err := e.workerSvc.CompleteTask(e.ctx, e.workerID, task.ID, result); err != nil { - e.logger.Error("failed to complete task", + log.Error("failed to complete task", "task_id", task.ID, "error", err, ) @@ -269,7 +270,7 @@ func (e *WorkExecutor) tryClaimAndExecute() { } if err := e.workerSvc.FailTask(e.ctx, e.workerID, task.ID, result, e.workSvc); err != nil { - e.logger.Error("failed to record task failure", + log.Error("failed to record task failure", "task_id", task.ID, "error", err, ) diff --git a/internal/worker/work_executor_test.go b/internal/worker/work_executor_test.go index e20d03a..9c8f537 100644 --- a/internal/worker/work_executor_test.go +++ b/internal/worker/work_executor_test.go @@ -3,21 +3,12 @@ package worker import ( "context" "fmt" - "log/slog" "testing" "time" "github.com/orchard9/rdev/internal/domain" ) -// ============================================================================= -// WorkExecutor Tests -// ============================================================================= - -func testLogger() *slog.Logger { - return slog.Default() -} - func TestWorkExecutor_StartAndStop(t *testing.T) { deps := newTestDeps() @@ -25,7 +16,6 @@ func TestWorkExecutor_StartAndStop(t *testing.T) { WorkerID: "test-worker-1", PollPeriod: 100 * time.Millisecond, HeartbeatPeriod: 100 * time.Millisecond, - Logger: testLogger(), }) if err := executor.Start(); err != nil { @@ -79,7 +69,6 @@ func TestWorkExecutor_ClaimsAndExecutesTask(t *testing.T) { WorkerID: "test-worker-2", PollPeriod: 50 * time.Millisecond, HeartbeatPeriod: 5 * time.Second, - Logger: testLogger(), }) // Register the worker (normally done by Start) then call tryClaimAndExecute directly @@ -122,7 +111,6 @@ func TestWorkExecutor_FailsTaskOnAgentError(t *testing.T) { WorkerID: "test-worker-3", PollPeriod: 50 * time.Millisecond, HeartbeatPeriod: 5 * time.Second, - Logger: testLogger(), }) if err := executor.Start(); err != nil { @@ -168,7 +156,6 @@ func TestWorkExecutor_UnsupportedTaskType(t *testing.T) { WorkerID: "test-worker-4", PollPeriod: 50 * time.Millisecond, HeartbeatPeriod: 5 * time.Second, - Logger: testLogger(), }) if err := executor.Start(); err != nil { @@ -200,7 +187,7 @@ func TestBuildExecutor_Execute(t *testing.T) { result: &domain.AgentResult{ExitCode: 0, DurationMs: 500}, } registry := &mockCodeAgentRegistry{agent: agent} - exec := NewBuildExecutor(registry, nil, nil, nil, nil) + exec := NewBuildExecutor(registry, nil, nil, nil) task := &domain.WorkTask{ ID: "task-1", @@ -220,7 +207,7 @@ func TestBuildExecutor_Execute(t *testing.T) { t.Run("missing prompt", func(t *testing.T) { registry := &mockCodeAgentRegistry{agent: &mockCodeAgent{}} - exec := NewBuildExecutor(registry, nil, nil, nil, nil) + exec := NewBuildExecutor(registry, nil, nil, nil) task := &domain.WorkTask{ ID: "task-1", @@ -236,7 +223,7 @@ func TestBuildExecutor_Execute(t *testing.T) { t.Run("no agent available", func(t *testing.T) { registry := &mockCodeAgentRegistry{agent: nil} - exec := NewBuildExecutor(registry, nil, nil, nil, nil) + exec := NewBuildExecutor(registry, nil, nil, nil) task := &domain.WorkTask{ ID: "task-1", @@ -253,7 +240,7 @@ func TestBuildExecutor_Execute(t *testing.T) { t.Run("agent execution error", func(t *testing.T) { agent := &mockCodeAgent{err: fmt.Errorf("connection refused")} registry := &mockCodeAgentRegistry{agent: agent} - exec := NewBuildExecutor(registry, nil, nil, nil, nil) + exec := NewBuildExecutor(registry, nil, nil, nil) task := &domain.WorkTask{ ID: "task-1", @@ -275,7 +262,7 @@ func TestBuildExecutor_Execute(t *testing.T) { result: &domain.AgentResult{ExitCode: 1, DurationMs: 500}, } registry := &mockCodeAgentRegistry{agent: agent} - exec := NewBuildExecutor(registry, nil, nil, nil, nil) + exec := NewBuildExecutor(registry, nil, nil, nil) task := &domain.WorkTask{ ID: "task-1", @@ -291,7 +278,7 @@ func TestBuildExecutor_Execute(t *testing.T) { } func TestBuildExecutor_ParseSpec(t *testing.T) { - exec := NewBuildExecutor(nil, nil, nil, nil, nil) + exec := NewBuildExecutor(nil, nil, nil, nil) t.Run("valid spec", func(t *testing.T) { spec, err := exec.parseSpec(map[string]any{ diff --git a/slack-preparation-thoughts-2.md b/slack-preparation-thoughts-2.md deleted file mode 100644 index 5a32b8d..0000000 --- a/slack-preparation-thoughts-2.md +++ /dev/null @@ -1,75 +0,0 @@ -# Slack Preparation Phase 2 (Execution Plan) - -> Based on `slack-preparation-thoughts.md`, this plan details the specific files and commands needed to enable `slackpath-1` (Identity) and `slackpath-2` (Async Workers). - -## 1. Missing Templates (Priority: High) - -We must implement these templates in `internal/adapter/templates/templates/` so the Tree Runner can execute `add-worker`, `add-db`, and `add-redis`. - -### `worker` Template -* **Location:** `internal/adapter/templates/templates/worker` -* **Structure:** Go service similar to `go-api` but optimized for long-running tasks. -* **Key File:** `cmd/worker/main.go` (starts a queue consumer instead of an HTTP server). -* **Dockerfile:** Standard Go build. -* **Helm Chart:** Deployment (not Service/Ingress). - -### `postgres` Template -* **Location:** `internal/adapter/templates/templates/postgres` -* **Nature:** Infrastructure Wrapper. -* **Helm Chart:** Dependencies on `bitnami/postgresql` or a custom StatefulSet. -* **Outputs:** Connection string secret. - -### `redis` Template -* **Location:** `internal/adapter/templates/templates/redis` -* **Nature:** Infrastructure Wrapper. -* **Helm Chart:** Dependencies on `bitnami/redis`. -* **Outputs:** Host/Port secrets. - -## 2. Missing Shared Packages (Priority: Critical) - -The agent needs standard libraries to avoid writing insecure auth or buggy queues from scratch. These go in `pkg/` at the root of the project templates (or the `skeleton`). - -### `pkg/auth` (for Path 1) -* **JWT Handling:** `Sign(claims) string`, `Parse(token) (*Claims, error)`. -* **Middleware:** `func RequireAuth(next http.Handler) http.Handler`. -* **Context:** `func GetUser(ctx) User`. - -### `pkg/queue` (for Path 2) -* **Interface:** - ```go - type JobQueue interface { - Enqueue(ctx context.Context, topic string, payload any) error - Process(ctx context.Context, topic string, handler func(payload []byte) error) - } - ``` -* **Implementation:** Redis-based (using `go-redis/v9`). - -## 3. SDLC Commands (Priority: Critical) - -The cookbooks use commands that don't exist. We must create the prompt definitions. - -### `.claude/commands/spec-feature.md` -* **Goal:** Generate `spec.md`. -* **Instructions:** "You are a Technical PM. Analyze the feature slug. Write a requirements doc." - -### `.claude/commands/design-feature.md` -* **Goal:** Generate `design.md`. -* **Instructions:** "You are a System Architect. Analyze `spec.md`. Define DB Schema, API Contracts, and Package structure." - -### `.claude/commands/implement-feature.md` -* **Goal:** Write Code. -* **Instructions:** "You are a Senior Go Developer. Read `design.md`. Implement the changes. Run tests." - -## 4. API Verification (Priority: Medium) - -We need to ensure `rdev-api` actually handles the `POST /projects/{id}/builds` endpoint with the `prompt` payload correctly (routing it to the Claude Code agent). - -* **Check:** `cmd/rdev-api/internal/service/build_service.go` (or similar). -* **Verify:** Does it invoke the `claude` CLI inside the project pod? - -## Execution Order - -1. **Templates:** Create `worker`, `postgres`, `redis` folders. -2. **Packages:** Write `pkg/auth` and `pkg/queue` code to be included in the `skeleton` template. -3. **Commands:** Create the 3 markdown files in `.claude/commands/`. -4. **Test:** Run `slackpath-1`. diff --git a/slack-preparation-thoughts.md b/slack-preparation-thoughts.md deleted file mode 100644 index a3cf10b..0000000 --- a/slack-preparation-thoughts.md +++ /dev/null @@ -1,67 +0,0 @@ -# Slack Preparation Analysis - -To successfully build a Slack-like distributed system using the `rdev` agentic workflow, we must bridge the gap between our current capabilities and the requirements of the `slackpath-*` cookbooks. - -## 1. Success Enablers (What we have) - -* **`pkg/api` Chassis**: A solid Go service foundation (`App` struct) that handles routing, logging (`slog`), middleware, and graceful shutdown. This allows us to spin up consistent APIs quickly. -* **`go-api` Template**: A standardized backend service template in `internal/adapter/templates/templates/go-api` that likely uses the chassis. -* **`skeleton` Template**: A monorepo base to hold multiple services. -* **Tree Runner**: A robust orchestration engine (`tree-runner.sh`) that can execute complex workflows, manage state, and verify deployments. -* **Infrastructure**: We seem to have K8s/Woodpecker integration via `deploy-k8s` skill. - -## 2. Functionality Gaps (What is missing) - -### Critical Infrastructure -* **Missing Component Types**: The cookbooks assume `type: worker`, `type: postgres`, and `type: redis`. Currently, `internal/adapter/templates/templates` only shows `go-api`, `astro-landing`, and `skeleton`. - * *Risk:* `add-db` and `add-worker` steps will fail in the runner. -* **Missing Shared Packages**: - * `pkg/auth`: No standard JWT verification middleware. Agent will have to write security code from scratch (high risk). - * `pkg/redis` or `pkg/queue`: No standard wrapper for Redis interactions or job queues. - * `pkg/websocket`: No standard WS upgrader/handler. - -### SDLC Automation -* **Missing SDLC Commands**: The cookbooks rely on `/spec-feature`, `/design-feature`, and `/implement-feature`. These do **not** exist in `.claude/commands/`. -* **Missing API Logic**: The `rdev-api` endpoints for SDLC (`/sdlc/features`, `/builds`) need to be verified to ensure they support the "Prompt-to-Code" flow described in the cookbooks. - -## 3. Required Claude Configuration - -We need to create/update these to enable the agentic workflow: - -* **`.claude/commands/implement-feature.md`**: Instructions for the agent on how to take a requirement and produce code. -* **`.claude/commands/spec-feature.md`**: Instructions for generating a technical spec artifact. -* **`.claude/commands/design-feature.md`**: Instructions for generating a DB schema/API design artifact. -* **`.claude/skills/distributed-systems.md`**: A skill that teaches the agent about our specific patterns for Worker/API communication (e.g., "Always use `pkg/queue` for async tasks"). - -## 4. Required SDLC Commands (in `rdev-api`) - -The `rdev-api` needs to handle these operations (referenced in cookbooks): - -* `POST /projects/{id}/sdlc/features`: Register a feature. -* `POST /projects/{id}/builds`: Trigger an autonomous build task (Prompt -> Code). -* `GET /projects/{id}/sdlc/next`: The "Classifier" logic to tell the runner what to do next. - -## 5. Core Packages & Patterns to Implement - -Before running `slackpath-1`: - -1. **`pkg/auth`**: - * `GenerateToken(user User) (string, error)` - * `Middleware(secret string) func(http.Handler) http.Handler` - * `UserFromContext(ctx) User` - -2. **`pkg/queue`** (for Path 2): - * `Producer` interface (`Enqueue(job)`) - * `Consumer` interface (`RegisterHandler(type, func)`) - * Redis implementation. - -3. **`pkg/realtime`** (for Path 3): - * Websocket Hub implementation (register, unregister, broadcast). - * Redis Pub/Sub adapter for scaling across pods. - -## Action Plan - -1. **Scaffold Missing Templates**: Create `worker`, `redis` (helm chart wrapper), and `postgres` (helm chart wrapper) in `internal/adapter/templates/templates`. -2. **Build Shared Libs**: Implement `pkg/auth` and `pkg/queue` to standardize the "hard parts". -3. **Install SDLC Commands**: Create the missing markdown files in `.claude/commands`. -4. **Verify API**: Ensure `rdev-api` has the SDLC endpoints mounted.