fix: resolve systemic debt in worker and skeleton templates

Worker template fixes:
- Replace panic() with logger.Error() + os.Exit(1) for config errors
- Remove double-timeout application (context + middleware)
- Add error message truncation to prevent log bloat
- Use named constants for shutdown grace period and stale check interval

Skeleton pkg/auth fixes:
- Fix error wrapping to use %w consistently in jwt.go
- Add GetUserOrError() as safe alternative to MustGetUser() panic

Skeleton pkg/queue fixes:
- Check RowsAffected() errors instead of ignoring them
- Add input validation to EnqueueWithOptions (require job type, cap retries)
- Add log truncation for error messages
- Fix inaccurate doc comment claiming exponential backoff

Worker timeout consolidation:
- Add internal/worker/timeouts.go with named constants
- Migrate all workers to use timeout constants

Cleanup:
- Remove obsolete slack-preparation-thoughts.md files

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
jordan 2026-02-04 23:44:55 -07:00
parent d69da6d627
commit 53862c773b
25 changed files with 269 additions and 388 deletions

View File

@ -276,7 +276,6 @@ func main() {
podGitOps = worker.NewPodGitOperations(worker.PodGitOperationsConfig{ podGitOps = worker.NewPodGitOperations(worker.PodGitOperationsConfig{
Namespace: "rdev", Namespace: "rdev",
GiteaToken: infraCfg.GiteaToken, GiteaToken: infraCfg.GiteaToken,
Logger: logger,
}) })
} }
@ -485,7 +484,6 @@ func main() {
giteaClient, giteaClient,
worker.ExternalHealthConfig{ worker.ExternalHealthConfig{
CheckInterval: 30 * time.Second, CheckInterval: 30 * time.Second,
Logger: logger,
}, },
) )
externalHealthChecker.Start() externalHealthChecker.Start()
@ -540,7 +538,6 @@ func main() {
streamPub, streamPub,
&worker.QueueProcessorConfig{ &worker.QueueProcessorConfig{
PollPeriod: 5 * time.Second, PollPeriod: 5 * time.Second,
Logger: logger,
}, },
).WithWebhookDispatcher(webhookDispatcher) ).WithWebhookDispatcher(webhookDispatcher)
if err := queueProcessor.Start(); err != nil { if err := queueProcessor.Start(); err != nil {
@ -549,9 +546,9 @@ func main() {
} }
// Start work executor (cross-project worker pool, git via kubectl exec) // Start work executor (cross-project worker pool, git via kubectl exec)
buildExecutor := worker.NewBuildExecutor(agentRegistry, podGitOps, streamPub, logger, nil) buildExecutor := worker.NewBuildExecutor(agentRegistry, podGitOps, streamPub, nil)
// VerifyExecutor for visual captures via Playwright pod // VerifyExecutor for visual captures via Playwright pod
verifyExecutor := worker.NewVerifyExecutor(k8sExecutor, streamPub, logger, &worker.VerifyExecutorConfig{ verifyExecutor := worker.NewVerifyExecutor(k8sExecutor, streamPub, &worker.VerifyExecutorConfig{
Namespace: namespace, Namespace: namespace,
PodName: "playwright-0", PodName: "playwright-0",
}) })
@ -561,11 +558,9 @@ func main() {
sdlcTaskExecutor = worker.NewSDLCTaskExecutor(worker.SDLCTaskExecutorConfig{ sdlcTaskExecutor = worker.NewSDLCTaskExecutor(worker.SDLCTaskExecutorConfig{
Namespace: namespace, Namespace: namespace,
PodGitOps: podGitOps, PodGitOps: podGitOps,
Logger: logger,
}) })
} }
workerCfg := worker.DefaultWorkExecutorConfig() workerCfg := worker.DefaultWorkExecutorConfig()
workerCfg.Logger = logger
workExecutor := worker.NewWorkExecutor( workExecutor := worker.NewWorkExecutor(
workerService, workerService,
workService, workService,
@ -591,7 +586,6 @@ func main() {
MaintenancePeriod: 1 * time.Minute, MaintenancePeriod: 1 * time.Minute,
MetricsPeriod: 15 * time.Second, MetricsPeriod: 15 * time.Second,
BuildAudit: buildAuditRepo, // Sync build audit when requeuing stale tasks BuildAudit: buildAuditRepo, // Sync build audit when requeuing stale tasks
Logger: logger,
}, },
) )
queueMaintenance.Start() queueMaintenance.Start()
@ -600,7 +594,6 @@ func main() {
operationCleanup := worker.NewOperationCleanup(operationRepo, &worker.OperationCleanupConfig{ operationCleanup := worker.NewOperationCleanup(operationRepo, &worker.OperationCleanupConfig{
RetentionPeriod: 30 * 24 * time.Hour, RetentionPeriod: 30 * 24 * time.Hour,
CleanupInterval: 1 * time.Hour, CleanupInterval: 1 * time.Hour,
Logger: logger,
}) })
operationCleanup.Start() operationCleanup.Start()

View File

@ -21,14 +21,21 @@ import (
var migrationsFS embed.FS var migrationsFS embed.FS
func main() { func main() {
// Initialize logger first (with defaults) so we can log config errors
logger := logging.New(logging.Config{
Level: logging.LevelInfo,
Format: logging.FormatJSON,
}).WithService("{{COMPONENT_NAME}}")
// Initialize configuration // Initialize configuration
cfg, err := workerconfig.Load() cfg, err := workerconfig.Load()
if err != nil { if err != nil {
panic("failed to load config: " + err.Error()) logger.Error("failed to load config", "error", err)
os.Exit(1)
} }
// Initialize logger // Reconfigure logger with loaded config
logger := logging.New(logging.Config{ logger = logging.New(logging.Config{
Level: logging.ParseLevel(cfg.Logging.Level), Level: logging.ParseLevel(cfg.Logging.Level),
Format: logging.ParseFormat(cfg.Logging.Format), Format: logging.ParseFormat(cfg.Logging.Format),
Environment: cfg.AppConfig.Environment, Environment: cfg.AppConfig.Environment,
@ -87,18 +94,22 @@ func main() {
sig := <-sigCh sig := <-sigCh
logger.Info("received shutdown signal", "signal", sig.String()) logger.Info("received shutdown signal", "signal", sig.String())
// Trigger graceful shutdown // Trigger graceful shutdown with grace period
logger.Info("initiating graceful shutdown")
cancel() cancel()
// Give in-flight jobs time to complete // Give in-flight jobs time to complete (grace period)
time.Sleep(2 * time.Second) // This allows handlers to notice context cancellation and finish cleanly.
const shutdownGracePeriod = 5 * time.Second
time.Sleep(shutdownGracePeriod)
logger.Info("{{COMPONENT_NAME}} worker stopped") logger.Info("{{COMPONENT_NAME}} worker stopped")
} }
// runStaleJobRecovery periodically requeues jobs that have been running too long. // runStaleJobRecovery periodically requeues jobs that have been running too long.
func runStaleJobRecovery(ctx context.Context, q *queue.PostgresQueue, timeout time.Duration, logger *logging.Logger) { func runStaleJobRecovery(ctx context.Context, q *queue.PostgresQueue, timeout time.Duration, logger *logging.Logger) {
ticker := time.NewTicker(time.Minute) const staleCheckInterval = time.Minute
ticker := time.NewTicker(staleCheckInterval)
defer ticker.Stop() defer ticker.Stop()
for { for {

View File

@ -108,25 +108,26 @@ func (h *Handler) processNextJob(ctx context.Context) error {
h.mu.RUnlock() h.mu.RUnlock()
if !ok { if !ok {
errMsg := fmt.Sprintf("unknown job type: %s", job.Type)
h.logger.Error("no handler for job type", "job_id", job.ID, "type", job.Type) h.logger.Error("no handler for job type", "job_id", job.ID, "type", job.Type)
return h.queue.Fail(ctx, job.ID, errMsg) return h.queue.Fail(ctx, job.ID, fmt.Sprintf("unknown job type: %s", job.Type))
} }
// Create job context with timeout // Apply middleware and process (TimeoutMiddleware handles the deadline)
jobCtx, cancel := context.WithTimeout(ctx, h.config.JobTimeout)
defer cancel()
// Apply middleware and process
wrappedHandler := queue.Chain( wrappedHandler := queue.Chain(
queue.RecoveryMiddleware(h.logger), queue.RecoveryMiddleware(h.logger),
queue.LoggingMiddleware(h.logger), queue.LoggingMiddleware(h.logger),
queue.TimeoutMiddleware(h.config.JobTimeout), queue.TimeoutMiddleware(h.config.JobTimeout),
)(handler) )(handler)
// Use parent context - TimeoutMiddleware applies the job timeout
jobCtx := ctx
_ = jobCtx // jobCtx used below
if err := wrappedHandler(jobCtx, job); err != nil { if err := wrappedHandler(jobCtx, job); err != nil {
h.logger.Debug("job handler failed", "job_id", job.ID, "error", err) // Truncate error message to prevent log bloat and potential data leakage
return h.queue.Fail(ctx, job.ID, err.Error()) errMsg := truncateErrorMessage(err.Error(), 1000)
h.logger.Debug("job handler failed", "job_id", job.ID, "error", errMsg)
return h.queue.Fail(ctx, job.ID, errMsg)
} }
return h.queue.Ack(ctx, job.ID) return h.queue.Ack(ctx, job.ID)
@ -136,3 +137,11 @@ func (h *Handler) processNextJob(ctx context.Context) error {
func (h *Handler) WorkerID() string { func (h *Handler) WorkerID() string {
return h.workerID return h.workerID
} }
// truncateErrorMessage limits error message length to prevent log bloat.
func truncateErrorMessage(msg string, maxLen int) string {
if len(msg) <= maxLen {
return msg
}
return msg[:maxLen-3] + "..."
}

View File

@ -2,6 +2,7 @@ package auth
import ( import (
"context" "context"
"errors"
) )
// contextKey is a private type for context keys to prevent collisions. // contextKey is a private type for context keys to prevent collisions.
@ -25,15 +26,26 @@ func GetUser(ctx context.Context) *User {
} }
// MustGetUser retrieves the user from the context. // MustGetUser retrieves the user from the context.
// Panics if no user is present. // Panics if no user is present - use only in handlers protected by RequireAuth middleware.
// For non-middleware contexts, prefer GetUserOrError which returns an error.
func MustGetUser(ctx context.Context) *User { func MustGetUser(ctx context.Context) *User {
user := GetUser(ctx) user := GetUser(ctx)
if user == nil { if user == nil {
panic("auth: user not found in context") panic("auth: user not found in context - ensure RequireAuth middleware is applied")
} }
return user return user
} }
// GetUserOrError retrieves the user from the context, returning an error if not present.
// Prefer this over MustGetUser when panic recovery is not guaranteed.
func GetUserOrError(ctx context.Context) (*User, error) {
user := GetUser(ctx)
if user == nil {
return nil, errors.New("auth: user not found in context")
}
return user, nil
}
// IsAuthenticated returns true if a user is present in the context. // IsAuthenticated returns true if a user is present in the context.
func IsAuthenticated(ctx context.Context) bool { func IsAuthenticated(ctx context.Context) bool {
return GetUser(ctx) != nil return GetUser(ctx) != nil

View File

@ -69,16 +69,16 @@ func (v *JWTValidator) Validate(ctx context.Context, tokenString string) (*User,
switch token.Method.(type) { switch token.Method.(type) {
case *jwt.SigningMethodHMAC: case *jwt.SigningMethodHMAC:
if v.secret == nil { if v.secret == nil {
return nil, fmt.Errorf("HMAC secret not configured") return nil, fmt.Errorf("%w: HMAC secret not configured", ErrInvalidToken)
} }
return v.secret, nil return v.secret, nil
case *jwt.SigningMethodRSA, *jwt.SigningMethodECDSA: case *jwt.SigningMethodRSA, *jwt.SigningMethodECDSA:
if v.publicKey == nil { if v.publicKey == nil {
return nil, fmt.Errorf("public key not configured") return nil, fmt.Errorf("%w: public key not configured", ErrInvalidToken)
} }
return v.publicKey, nil return v.publicKey, nil
default: default:
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"]) return nil, fmt.Errorf("%w: unexpected signing method %v", ErrInvalidToken, token.Header["alg"])
} }
}) })
@ -86,7 +86,7 @@ func (v *JWTValidator) Validate(ctx context.Context, tokenString string) (*User,
if errors.Is(err, jwt.ErrTokenExpired) { if errors.Is(err, jwt.ErrTokenExpired) {
return nil, ErrExpiredToken return nil, ErrExpiredToken
} }
return nil, fmt.Errorf("%w: %v", ErrInvalidToken, err) return nil, fmt.Errorf("%w: %w", ErrInvalidToken, err)
} }
claims, ok := token.Claims.(*JWTClaims) claims, ok := token.Claims.(*JWTClaims)

View File

@ -44,13 +44,25 @@ func (q *PostgresQueue) Enqueue(ctx context.Context, jobType string, payload map
// EnqueueWithOptions adds a job with custom configuration. // EnqueueWithOptions adds a job with custom configuration.
func (q *PostgresQueue) EnqueueWithOptions(ctx context.Context, job Job) (string, error) { func (q *PostgresQueue) EnqueueWithOptions(ctx context.Context, job Job) (string, error) {
// Validate required fields
if job.Type == "" {
return "", fmt.Errorf("job type is required: %w", ErrJobNotFound)
}
job.ID = uuid.New().String() job.ID = uuid.New().String()
job.Status = StatusPending job.Status = StatusPending
job.CreatedAt = time.Now().UTC() job.CreatedAt = time.Now().UTC()
// Apply defaults and constraints
if job.MaxRetries == 0 { if job.MaxRetries == 0 {
job.MaxRetries = 3 job.MaxRetries = 3
} }
if job.MaxRetries > 100 {
job.MaxRetries = 100 // Cap at reasonable limit
}
if job.Payload == nil {
job.Payload = make(map[string]any)
}
payloadJSON, err := json.Marshal(job.Payload) payloadJSON, err := json.Marshal(job.Payload)
if err != nil { if err != nil {
@ -117,7 +129,10 @@ func (q *PostgresQueue) Ack(ctx context.Context, jobID string) error {
return fmt.Errorf("ack job: %w", err) return fmt.Errorf("ack job: %w", err)
} }
rows, _ := result.RowsAffected() rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("check rows affected: %w", err)
}
if rows == 0 { if rows == 0 {
return ErrJobNotFound return ErrJobNotFound
} }
@ -160,12 +175,20 @@ func (q *PostgresQueue) Fail(ctx context.Context, jobID string, errMsg string) e
return fmt.Errorf("fail job: %w", err) return fmt.Errorf("fail job: %w", err)
} }
rows, _ := result.RowsAffected() rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("check rows affected: %w", err)
}
if rows == 0 { if rows == 0 {
return ErrJobNotFound return ErrJobNotFound
} }
q.logger.Debug("job failed", "job_id", jobID, "error", errMsg) // Truncate error message to prevent log bloat (limit to 500 chars for logging)
logErrMsg := errMsg
if len(logErrMsg) > 500 {
logErrMsg = logErrMsg[:497] + "..."
}
q.logger.Debug("job failed", "job_id", jobID, "error", logErrMsg)
return nil return nil
} }
@ -179,7 +202,10 @@ func (q *PostgresQueue) Heartbeat(ctx context.Context, jobID string) error {
return fmt.Errorf("heartbeat job: %w", err) return fmt.Errorf("heartbeat job: %w", err)
} }
rows, _ := result.RowsAffected() rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("check rows affected: %w", err)
}
if rows == 0 { if rows == 0 {
return ErrJobNotFound return ErrJobNotFound
} }
@ -201,7 +227,10 @@ func (q *PostgresQueue) RequeueStale(ctx context.Context, timeout time.Duration)
return 0, fmt.Errorf("requeue stale jobs: %w", err) return 0, fmt.Errorf("requeue stale jobs: %w", err)
} }
count, _ := result.RowsAffected() count, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("check rows affected: %w", err)
}
if count > 0 { if count > 0 {
q.logger.Info("requeued stale jobs", "count", count, "timeout", timeout) q.logger.Info("requeued stale jobs", "count", count, "timeout", timeout)
} }

View File

@ -2,9 +2,9 @@
// //
// This package implements a reliable producer/consumer pattern using: // This package implements a reliable producer/consumer pattern using:
// - Atomic dequeue with FOR UPDATE SKIP LOCKED // - Atomic dequeue with FOR UPDATE SKIP LOCKED
// - Automatic retry with exponential backoff // - Automatic retry (immediate requeue up to max_retries)
// - Job priority and ordering // - Job priority and ordering
// - Stale job recovery // - Stale job recovery via RequeueStale
// //
// Usage: // Usage:
// //

View File

@ -6,12 +6,12 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log/slog"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/port"
"github.com/orchard9/rdev/internal/sanitize" "github.com/orchard9/rdev/internal/sanitize"
@ -26,7 +26,6 @@ type ProjectService struct {
queue port.CommandQueue // Optional command queue queue port.CommandQueue // Optional command queue
webhookDispatcher port.WebhookDispatcher // Optional webhook dispatcher webhookDispatcher port.WebhookDispatcher // Optional webhook dispatcher
agentRegistry port.CodeAgentRegistry // Optional code agent registry agentRegistry port.CodeAgentRegistry // Optional code agent registry
logger *slog.Logger
cmdID atomic.Uint64 cmdID atomic.Uint64
} }
@ -40,16 +39,9 @@ func NewProjectService(
projects: projects, projects: projects,
executor: executor, executor: executor,
streams: streams, streams: streams,
logger: slog.Default(),
} }
} }
// WithLogger sets a custom logger for the service.
func (s *ProjectService) WithLogger(logger *slog.Logger) *ProjectService {
s.logger = logger
return s
}
// WithAuditLogger sets an audit logger for the service. // WithAuditLogger sets an audit logger for the service.
func (s *ProjectService) WithAuditLogger(auditLogger port.AuditLogger) *ProjectService { func (s *ProjectService) WithAuditLogger(auditLogger port.AuditLogger) *ProjectService {
s.auditLogger = auditLogger s.auditLogger = auditLogger
@ -83,15 +75,17 @@ type AuditContext struct {
// List returns all available projects with refreshed status. // List returns all available projects with refreshed status.
func (s *ProjectService) List(ctx context.Context) ([]domain.Project, error) { func (s *ProjectService) List(ctx context.Context) ([]domain.Project, error) {
log := logging.FromContext(ctx).WithService("ProjectService")
// Refresh status from Kubernetes // Refresh status from Kubernetes
if err := s.projects.RefreshStatus(ctx); err != nil { if err := s.projects.RefreshStatus(ctx); err != nil {
s.logger.Warn("failed to refresh project status", "error", err) log.Warn("failed to refresh project status", logging.FieldError, err)
} }
return s.projects.List(ctx) return s.projects.List(ctx)
} }
// Get returns a specific project by ID. // Get returns a specific project by ID.
func (s *ProjectService) Get(ctx context.Context, id domain.ProjectID) (*domain.Project, error) { func (s *ProjectService) Get(ctx context.Context, id domain.ProjectID) (*domain.Project, error) {
log := logging.FromContext(ctx).WithService("ProjectService")
project, err := s.projects.Get(ctx, id) project, err := s.projects.Get(ctx, id)
if err != nil { if err != nil {
return nil, err return nil, err
@ -99,7 +93,7 @@ func (s *ProjectService) Get(ctx context.Context, id domain.ProjectID) (*domain.
// Refresh status // Refresh status
if refreshErr := s.projects.RefreshStatus(ctx); refreshErr != nil { if refreshErr := s.projects.RefreshStatus(ctx); refreshErr != nil {
s.logger.Warn("failed to refresh project status", "project", id, "error", refreshErr) log.Warn("failed to refresh project status", logging.FieldProjectID, id, logging.FieldError, refreshErr)
} }
return project, nil return project, nil
@ -168,6 +162,7 @@ func (s *ProjectService) ExecuteClaude(ctx context.Context, req ExecuteClaudeReq
// Log audit start if audit logger is configured // Log audit start if audit logger is configured
if s.auditLogger != nil && req.Audit != nil { if s.auditLogger != nil && req.Audit != nil {
log := logging.FromContext(ctx).WithService("ProjectService")
argsJSON, _ := json.Marshal(cmd.Args) argsJSON, _ := json.Marshal(cmd.Args)
auditEntry := &domain.AuditLogEntry{ auditEntry := &domain.AuditLogEntry{
ID: uuid.New().String(), ID: uuid.New().String(),
@ -182,7 +177,7 @@ func (s *ProjectService) ExecuteClaude(ctx context.Context, req ExecuteClaudeReq
Status: domain.AuditStatusRunning, Status: domain.AuditStatusRunning,
} }
if err := s.auditLogger.LogCommandStart(ctx, auditEntry); err != nil { if err := s.auditLogger.LogCommandStart(ctx, auditEntry); err != nil {
s.logger.Warn("failed to log audit start", "command_id", cmdID, "error", err) log.Warn("failed to log audit start", "command_id", cmdID, logging.FieldError, err)
} }
} }
@ -222,6 +217,7 @@ func (s *ProjectService) executeCommand(podName string, cmd *domain.Command) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel() defer cancel()
log := logging.FromContext(ctx).WithService("ProjectService")
streamID := string(cmd.ID) streamID := string(cmd.ID)
var lastEventID string var lastEventID string
var outputSizeBytes int64 var outputSizeBytes int64
@ -283,7 +279,7 @@ func (s *ProjectService) executeCommand(podName string, cmd *domain.Command) {
OutputSizeBytes: outputSizeBytes, OutputSizeBytes: outputSizeBytes,
} }
if err := s.auditLogger.LogCommandEnd(ctx, string(cmd.ID), auditResult); err != nil { if err := s.auditLogger.LogCommandEnd(ctx, string(cmd.ID), auditResult); err != nil {
s.logger.Warn("failed to log audit end", "command_id", cmd.ID, "error", err) log.Warn("failed to log audit end", "command_id", cmd.ID, logging.FieldError, err)
} }
} }
@ -311,10 +307,10 @@ func (s *ProjectService) executeCommand(podName string, cmd *domain.Command) {
Error: errorMsg, Error: errorMsg,
}) })
s.logger.Debug("command completed", log.Debug("command completed",
"command_id", cmd.ID, "command_id", cmd.ID,
"exit_code", result.ExitCode, "exit_code", result.ExitCode,
"duration_ms", result.DurationMs, logging.FieldDuration, result.DurationMs,
"last_event_id", lastEventID, "last_event_id", lastEventID,
"complete_event_id", eventID, "complete_event_id", eventID,
) )
@ -340,10 +336,11 @@ func (s *ProjectService) dispatchWebhookEvent(ctx context.Context, projectID str
} }
if err := s.webhookDispatcher.Dispatch(ctx, projectID, event); err != nil { if err := s.webhookDispatcher.Dispatch(ctx, projectID, event); err != nil {
s.logger.Warn("failed to dispatch webhook event", log := logging.FromContext(ctx).WithService("ProjectService")
"project_id", projectID, log.Warn("failed to dispatch webhook event",
logging.FieldProjectID, projectID,
"event_type", eventType, "event_type", eventType,
"error", err, logging.FieldError, err,
) )
} }
} }

View File

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/port"
) )
@ -34,6 +35,7 @@ func (s *ProjectService) executeAgentCommand(agent port.CodeAgent, req *domain.A
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel() defer cancel()
log := logging.FromContext(ctx).WithService("ProjectService")
streamID := string(cmd.ID) streamID := string(cmd.ID)
var lastEventID string var lastEventID string
var outputSizeBytes int64 var outputSizeBytes int64
@ -140,7 +142,7 @@ func (s *ProjectService) executeAgentCommand(agent port.CodeAgent, req *domain.A
OutputSizeBytes: outputSizeBytes, OutputSizeBytes: outputSizeBytes,
} }
if err := s.auditLogger.LogCommandEnd(ctx, string(cmd.ID), auditResult); err != nil { if err := s.auditLogger.LogCommandEnd(ctx, string(cmd.ID), auditResult); err != nil {
s.logger.Warn("failed to log audit end", "command_id", cmd.ID, "error", err) log.Warn("failed to log audit end", "command_id", cmd.ID, logging.FieldError, err)
} }
} }
@ -168,12 +170,12 @@ func (s *ProjectService) executeAgentCommand(agent port.CodeAgent, req *domain.A
Error: errorMsg, Error: errorMsg,
}) })
s.logger.Debug("agent command completed", log.Debug("agent command completed",
"command_id", cmd.ID, "command_id", cmd.ID,
"provider", agent.Provider(), "provider", agent.Provider(),
"session_id", result.SessionID, "session_id", result.SessionID,
"exit_code", result.ExitCode, "exit_code", result.ExitCode,
"duration_ms", result.DurationMs, logging.FieldDuration, result.DurationMs,
"last_event_id", lastEventID, "last_event_id", lastEventID,
"complete_event_id", eventID, "complete_event_id", eventID,
) )

View File

@ -10,6 +10,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/sanitize" "github.com/orchard9/rdev/internal/sanitize"
) )
@ -66,6 +67,7 @@ func (s *ProjectService) ExecuteShell(ctx context.Context, req ExecuteShellReque
// Log audit start if audit logger is configured // Log audit start if audit logger is configured
if s.auditLogger != nil && req.Audit != nil { if s.auditLogger != nil && req.Audit != nil {
log := logging.FromContext(ctx).WithService("ProjectService")
argsJSON, _ := json.Marshal(cmd.Args) argsJSON, _ := json.Marshal(cmd.Args)
auditEntry := &domain.AuditLogEntry{ auditEntry := &domain.AuditLogEntry{
ID: uuid.New().String(), ID: uuid.New().String(),
@ -80,7 +82,7 @@ func (s *ProjectService) ExecuteShell(ctx context.Context, req ExecuteShellReque
Status: domain.AuditStatusRunning, Status: domain.AuditStatusRunning,
} }
if err := s.auditLogger.LogCommandStart(ctx, auditEntry); err != nil { if err := s.auditLogger.LogCommandStart(ctx, auditEntry); err != nil {
s.logger.Warn("failed to log audit start", "command_id", cmdID, "error", err) log.Warn("failed to log audit start", "command_id", cmdID, logging.FieldError, err)
} }
} }
@ -146,6 +148,7 @@ func (s *ProjectService) ExecuteGit(ctx context.Context, req ExecuteGitRequest)
// Log audit start if audit logger is configured // Log audit start if audit logger is configured
if s.auditLogger != nil && req.Audit != nil { if s.auditLogger != nil && req.Audit != nil {
log := logging.FromContext(ctx).WithService("ProjectService")
argsJSON, _ := json.Marshal(cmd.Args) argsJSON, _ := json.Marshal(cmd.Args)
auditEntry := &domain.AuditLogEntry{ auditEntry := &domain.AuditLogEntry{
ID: uuid.New().String(), ID: uuid.New().String(),
@ -160,7 +163,7 @@ func (s *ProjectService) ExecuteGit(ctx context.Context, req ExecuteGitRequest)
Status: domain.AuditStatusRunning, Status: domain.AuditStatusRunning,
} }
if err := s.auditLogger.LogCommandStart(ctx, auditEntry); err != nil { if err := s.auditLogger.LogCommandStart(ctx, auditEntry); err != nil {
s.logger.Warn("failed to log audit start", "command_id", cmdID, "error", err) log.Warn("failed to log audit start", "command_id", cmdID, logging.FieldError, err)
} }
} }

View File

@ -3,11 +3,11 @@ package worker
import ( import (
"context" "context"
"fmt" "fmt"
"log/slog"
"strings" "strings"
"time" "time"
"github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/port"
) )
@ -29,7 +29,6 @@ type BuildExecutor struct {
agentRegistry port.CodeAgentRegistry agentRegistry port.CodeAgentRegistry
podGitOps *PodGitOperations // Post-build git operations (runs in pod) podGitOps *PodGitOperations // Post-build git operations (runs in pod)
streams port.StreamPublisher // SSE stream publisher for real-time events streams port.StreamPublisher // SSE stream publisher for real-time events
logger *slog.Logger
defaultPodName string // Default claudebox pod for agent execution defaultPodName string // Default claudebox pod for agent execution
namespace string // Kubernetes namespace for the pod namespace string // Kubernetes namespace for the pod
} }
@ -45,12 +44,8 @@ func NewBuildExecutor(
agentRegistry port.CodeAgentRegistry, agentRegistry port.CodeAgentRegistry,
podGitOps *PodGitOperations, podGitOps *PodGitOperations,
streams port.StreamPublisher, streams port.StreamPublisher,
logger *slog.Logger,
cfg *BuildExecutorConfig, cfg *BuildExecutorConfig,
) *BuildExecutor { ) *BuildExecutor {
if logger == nil {
logger = slog.Default()
}
if cfg == nil { if cfg == nil {
cfg = &BuildExecutorConfig{ cfg = &BuildExecutorConfig{
DefaultPodName: "claudebox-0", DefaultPodName: "claudebox-0",
@ -61,7 +56,6 @@ func NewBuildExecutor(
agentRegistry: agentRegistry, agentRegistry: agentRegistry,
podGitOps: podGitOps, podGitOps: podGitOps,
streams: streams, streams: streams,
logger: logger.With("component", "build-executor"),
defaultPodName: cfg.DefaultPodName, defaultPodName: cfg.DefaultPodName,
namespace: cfg.Namespace, namespace: cfg.Namespace,
} }
@ -69,6 +63,7 @@ func NewBuildExecutor(
// Execute runs a build task by translating its spec into an agent call. // Execute runs a build task by translating its spec into an agent call.
func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult { func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult {
log := logging.FromContext(ctx).WithWorker("build-executor")
start := time.Now() start := time.Now()
streamID := task.ID // Use task ID as stream ID for SSE streamID := task.ID // Use task ID as stream ID for SSE
@ -126,7 +121,7 @@ func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *dom
} }
} }
b.logger.Info("ensuring git repository is ready", log.Info("ensuring git repository is ready",
"task_id", task.ID, "task_id", task.ID,
"pod", podName, "pod", podName,
"workDir", workDir, "workDir", workDir,
@ -168,7 +163,7 @@ func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *dom
const maxOutputSize = 1 << 20 // 1MB const maxOutputSize = 1 << 20 // 1MB
var outputBuilder strings.Builder var outputBuilder strings.Builder
b.logger.Info("executing build via agent", log.Info("executing build via agent",
"task_id", task.ID, "task_id", task.ID,
"project_id", task.ProjectID, "project_id", task.ProjectID,
"agent", agent.Name(), "agent", agent.Name(),
@ -261,7 +256,7 @@ func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *dom
gitResult := b.podGitOps.CommitAndPush(ctx, podName, workDir, commitMsg, spec.AutoPush) gitResult := b.podGitOps.CommitAndPush(ctx, podName, workDir, commitMsg, spec.AutoPush)
if gitResult.Error != nil { if gitResult.Error != nil {
b.logger.Warn("post-build git operations failed", log.Warn("post-build git operations failed",
"task_id", task.ID, "task_id", task.ID,
"error", gitResult.Error, "error", gitResult.Error,
) )
@ -270,14 +265,14 @@ func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *dom
} else if gitResult.HasChanges { } else if gitResult.HasChanges {
result.CommitSHA = gitResult.CommitSHA result.CommitSHA = gitResult.CommitSHA
result.FilesChanged = gitResult.FilesChanged result.FilesChanged = gitResult.FilesChanged
b.logger.Info("post-build git operations completed", log.Info("post-build git operations completed",
"task_id", task.ID, "task_id", task.ID,
"commit", gitResult.CommitSHA, "commit", gitResult.CommitSHA,
"files", len(gitResult.FilesChanged), "files", len(gitResult.FilesChanged),
"pushed", gitResult.Pushed, "pushed", gitResult.Pushed,
) )
} else { } else {
b.logger.Info("no changes to commit after build", log.Info("no changes to commit after build",
"task_id", task.ID, "task_id", task.ID,
) )
} }

View File

@ -2,11 +2,11 @@ package worker
import ( import (
"context" "context"
"log/slog"
"sync" "sync"
"time" "time"
"github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/port"
) )
@ -19,7 +19,6 @@ type ExternalHealthChecker struct {
git port.ExternalHealthChecker // gitea git port.ExternalHealthChecker // gitea
interval time.Duration interval time.Duration
logger *slog.Logger
// Internal state (thread-safe) // Internal state (thread-safe)
mu sync.RWMutex mu sync.RWMutex
@ -35,14 +34,12 @@ type ExternalHealthChecker struct {
type ExternalHealthConfig struct { type ExternalHealthConfig struct {
// CheckInterval is how often to check external systems. Default: 30s. // CheckInterval is how often to check external systems. Default: 30s.
CheckInterval time.Duration CheckInterval time.Duration
Logger *slog.Logger
} }
// DefaultExternalHealthConfig returns sensible defaults. // DefaultExternalHealthConfig returns sensible defaults.
func DefaultExternalHealthConfig() ExternalHealthConfig { func DefaultExternalHealthConfig() ExternalHealthConfig {
return ExternalHealthConfig{ return ExternalHealthConfig{
CheckInterval: 30 * time.Second, CheckInterval: 30 * time.Second,
Logger: slog.Default(),
} }
} }
@ -57,9 +54,6 @@ func NewExternalHealthChecker(
if cfg.CheckInterval == 0 { if cfg.CheckInterval == 0 {
cfg.CheckInterval = 30 * time.Second cfg.CheckInterval = 30 * time.Second
} }
if cfg.Logger == nil {
cfg.Logger = slog.Default()
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -68,7 +62,6 @@ func NewExternalHealthChecker(
ci: ci, ci: ci,
git: git, git: git,
interval: cfg.CheckInterval, interval: cfg.CheckInterval,
logger: cfg.Logger.With("component", "external-health"),
statuses: make(map[domain.ExternalSystem]domain.ExternalSystemStatus), statuses: make(map[domain.ExternalSystem]domain.ExternalSystemStatus),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -77,7 +70,8 @@ func NewExternalHealthChecker(
// Start begins the background check loop. // Start begins the background check loop.
func (c *ExternalHealthChecker) Start() { func (c *ExternalHealthChecker) Start() {
c.logger.Info("external health checker started", "interval", c.interval) log := logging.FromContext(c.ctx).WithWorker("external-health")
log.Info("external health checker started", "interval", c.interval)
c.wg.Add(1) c.wg.Add(1)
go c.checkLoop() go c.checkLoop()
@ -85,10 +79,11 @@ func (c *ExternalHealthChecker) Start() {
// Stop gracefully shuts down the checker. // Stop gracefully shuts down the checker.
func (c *ExternalHealthChecker) Stop() { func (c *ExternalHealthChecker) Stop() {
c.logger.Info("external health checker stopping") log := logging.FromContext(c.ctx).WithWorker("external-health")
log.Info("external health checker stopping")
c.cancel() c.cancel()
c.wg.Wait() c.wg.Wait()
c.logger.Info("external health checker stopped") log.Info("external health checker stopped")
} }
// GetStatus returns the cached status for a specific system. // GetStatus returns the cached status for a specific system.
@ -198,6 +193,8 @@ func (c *ExternalHealthChecker) runChecks() {
// updateStatus updates cached status and logs/metrics on state changes. // updateStatus updates cached status and logs/metrics on state changes.
func (c *ExternalHealthChecker) updateStatus(status domain.ExternalSystemStatus) { func (c *ExternalHealthChecker) updateStatus(status domain.ExternalSystemStatus) {
log := logging.FromContext(c.ctx).WithWorker("external-health")
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -214,31 +211,31 @@ func (c *ExternalHealthChecker) updateStatus(status domain.ExternalSystemStatus)
if !existed { if !existed {
// First check // First check
if status.Healthy { if status.Healthy {
c.logger.Info("external system healthy", log.Info("external system healthy",
"system", status.System, "system", status.System,
"url", status.URL, "url", status.URL,
"latency", status.Latency, "latency", status.Latency,
) )
} else { } else {
c.logger.Warn("external system unhealthy", log.Warn("external system unhealthy",
"system", status.System, "system", status.System,
"url", status.URL, "url", status.URL,
"error", status.Error, logging.FieldError, status.Error,
) )
} }
} else if prev.Healthy != status.Healthy { } else if prev.Healthy != status.Healthy {
// State changed // State changed
if status.Healthy { if status.Healthy {
c.logger.Info("external system recovered", log.Info("external system recovered",
"system", status.System, "system", status.System,
"url", status.URL, "url", status.URL,
"latency", status.Latency, "latency", status.Latency,
) )
} else { } else {
c.logger.Warn("external system became unhealthy", log.Warn("external system became unhealthy",
"system", status.System, "system", status.System,
"url", status.URL, "url", status.URL,
"error", status.Error, logging.FieldError, status.Error,
) )
} }
} }

View File

@ -374,7 +374,7 @@ func newTestDeps() *testDeps {
WithBuildAudit(audit) WithBuildAudit(audit)
workSvc := service.NewWorkService(queue) workSvc := service.NewWorkService(queue)
buildExec := NewBuildExecutor(agentRegistry, nil, nil, nil, nil) buildExec := NewBuildExecutor(agentRegistry, nil, nil, nil)
return &testDeps{ return &testDeps{
queue: queue, queue: queue,

View File

@ -2,10 +2,10 @@ package worker
import ( import (
"context" "context"
"log/slog"
"sync" "sync"
"time" "time"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/port"
) )
@ -13,7 +13,6 @@ import (
// Operations older than the retention period (default 30 days) are deleted. // Operations older than the retention period (default 30 days) are deleted.
type OperationCleanup struct { type OperationCleanup struct {
repo port.OperationRepository repo port.OperationRepository
logger *slog.Logger
retentionPeriod time.Duration retentionPeriod time.Duration
cleanupInterval time.Duration cleanupInterval time.Duration
@ -31,8 +30,6 @@ type OperationCleanupConfig struct {
// CleanupInterval is how often to run cleanup. // CleanupInterval is how often to run cleanup.
// Default: 1 hour. // Default: 1 hour.
CleanupInterval time.Duration CleanupInterval time.Duration
Logger *slog.Logger
} }
// DefaultOperationCleanupConfig returns sensible defaults. // DefaultOperationCleanupConfig returns sensible defaults.
@ -40,7 +37,6 @@ func DefaultOperationCleanupConfig() *OperationCleanupConfig {
return &OperationCleanupConfig{ return &OperationCleanupConfig{
RetentionPeriod: 30 * 24 * time.Hour, // 30 days RetentionPeriod: 30 * 24 * time.Hour, // 30 days
CleanupInterval: 1 * time.Hour, CleanupInterval: 1 * time.Hour,
Logger: slog.Default(),
} }
} }
@ -54,7 +50,6 @@ func NewOperationCleanup(repo port.OperationRepository, cfg *OperationCleanupCon
return &OperationCleanup{ return &OperationCleanup{
repo: repo, repo: repo,
logger: cfg.Logger.With("component", "operation-cleanup"),
retentionPeriod: cfg.RetentionPeriod, retentionPeriod: cfg.RetentionPeriod,
cleanupInterval: cfg.CleanupInterval, cleanupInterval: cfg.CleanupInterval,
ctx: ctx, ctx: ctx,
@ -64,7 +59,8 @@ func NewOperationCleanup(repo port.OperationRepository, cfg *OperationCleanupCon
// Start begins the cleanup loop. // Start begins the cleanup loop.
func (c *OperationCleanup) Start() { func (c *OperationCleanup) Start() {
c.logger.Info("operation cleanup started", log := logging.FromContext(c.ctx).WithWorker("operation-cleanup")
log.Info("operation cleanup started",
"retention_period", c.retentionPeriod, "retention_period", c.retentionPeriod,
"cleanup_interval", c.cleanupInterval, "cleanup_interval", c.cleanupInterval,
) )
@ -75,10 +71,11 @@ func (c *OperationCleanup) Start() {
// Stop gracefully shuts down the cleanup worker. // Stop gracefully shuts down the cleanup worker.
func (c *OperationCleanup) Stop() { func (c *OperationCleanup) Stop() {
c.logger.Info("operation cleanup stopping") log := logging.FromContext(c.ctx).WithWorker("operation-cleanup")
log.Info("operation cleanup stopping")
c.cancel() c.cancel()
c.wg.Wait() c.wg.Wait()
c.logger.Info("operation cleanup stopped") log.Info("operation cleanup stopped")
} }
// cleanupLoop runs periodic cleanup. // cleanupLoop runs periodic cleanup.
@ -106,18 +103,20 @@ func (c *OperationCleanup) runCleanup() {
ctx, cancel := context.WithTimeout(c.ctx, TimeoutMaintenance) ctx, cancel := context.WithTimeout(c.ctx, TimeoutMaintenance)
defer cancel() defer cancel()
log := logging.FromContext(ctx).WithWorker("operation-cleanup")
cutoff := time.Now().Add(-c.retentionPeriod) cutoff := time.Now().Add(-c.retentionPeriod)
deleted, err := c.repo.DeleteOlderThan(ctx, cutoff) deleted, err := c.repo.DeleteOlderThan(ctx, cutoff)
if err != nil { if err != nil {
c.logger.Error("failed to cleanup old operations", log.Error("failed to cleanup old operations",
"error", err, logging.FieldError, err,
"cutoff", cutoff, "cutoff", cutoff,
) )
return return
} }
if deleted > 0 { if deleted > 0 {
c.logger.Info("cleaned up old operations", log.Info("cleaned up old operations",
"deleted", deleted, "deleted", deleted,
"cutoff", cutoff, "cutoff", cutoff,
) )

View File

@ -4,9 +4,10 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"log/slog"
"os/exec" "os/exec"
"strings" "strings"
"github.com/orchard9/rdev/internal/logging"
) )
// PodGitOperations provides git operations that run inside a Kubernetes pod // PodGitOperations provides git operations that run inside a Kubernetes pod
@ -17,7 +18,6 @@ type PodGitOperations struct {
giteaToken string giteaToken string
gitUser string gitUser string
gitEmail string gitEmail string
logger *slog.Logger
} }
// PodGitOperationsConfig configures pod git operations. // PodGitOperationsConfig configures pod git operations.
@ -33,8 +33,6 @@ type PodGitOperationsConfig struct {
// GitEmail is the git commit author email. // GitEmail is the git commit author email.
GitEmail string GitEmail string
Logger *slog.Logger
} }
// NewPodGitOperations creates a new pod git operations helper. // NewPodGitOperations creates a new pod git operations helper.
@ -45,15 +43,11 @@ func NewPodGitOperations(cfg PodGitOperationsConfig) *PodGitOperations {
if cfg.GitEmail == "" { if cfg.GitEmail == "" {
cfg.GitEmail = "worker@threesix.ai" cfg.GitEmail = "worker@threesix.ai"
} }
if cfg.Logger == nil {
cfg.Logger = slog.Default()
}
return &PodGitOperations{ return &PodGitOperations{
namespace: cfg.Namespace, namespace: cfg.Namespace,
giteaToken: cfg.GiteaToken, giteaToken: cfg.GiteaToken,
gitUser: cfg.GitUser, gitUser: cfg.GitUser,
gitEmail: cfg.GitEmail, gitEmail: cfg.GitEmail,
logger: cfg.Logger.With("component", "pod-git-ops"),
} }
} }
@ -87,6 +81,7 @@ func (g *PodGitOperations) IsGitRepo(ctx context.Context, podName, workDir strin
// If the workspace already contains a git repo, it pulls the latest changes instead. // If the workspace already contains a git repo, it pulls the latest changes instead.
// If the workspace exists but is not a git repo, it clears the directory first. // If the workspace exists but is not a git repo, it clears the directory first.
func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, cloneURL string) *CloneResult { func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, cloneURL string) *CloneResult {
log := logging.FromContext(ctx).WithWorker("pod-git-ops")
result := &CloneResult{} result := &CloneResult{}
if cloneURL == "" { if cloneURL == "" {
@ -103,24 +98,24 @@ func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, clon
// Normalize URLs for comparison (both should be HTTPS) // Normalize URLs for comparison (both should be HTTPS)
if err == nil && currentRemote == expectedURL { if err == nil && currentRemote == expectedURL {
g.logger.Info("workspace is already a git repo with correct remote, pulling latest", log.Info("workspace is already a git repo with correct remote, pulling latest",
"pod", podName, logging.FieldPodName, podName,
"workDir", workDir, "workDir", workDir,
) )
// Pull latest changes // Pull latest changes
if err := g.runGitInPod(ctx, podName, workDir, "pull", "--ff-only"); err != nil { if err := g.runGitInPod(ctx, podName, workDir, "pull", "--ff-only"); err != nil {
// Pull failed, but repo exists - not fatal, might have local changes // Pull failed, but repo exists - not fatal, might have local changes
g.logger.Warn("git pull failed, continuing with existing state", log.Warn("git pull failed, continuing with existing state",
"pod", podName, logging.FieldPodName, podName,
"error", err, logging.FieldError, err,
) )
} }
return result return result
} }
// Remote doesn't match - this is a different project's repo // Remote doesn't match - this is a different project's repo
g.logger.Info("workspace has different git remote, will re-clone", log.Info("workspace has different git remote, will re-clone",
"pod", podName, logging.FieldPodName, podName,
"workDir", workDir, "workDir", workDir,
"currentRemote", currentRemote, "currentRemote", currentRemote,
"expectedURL", expectedURL, "expectedURL", expectedURL,
@ -129,8 +124,8 @@ func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, clon
// Check if directory exists but is not a git repo - clear it first // Check if directory exists but is not a git repo - clear it first
if g.dirExists(ctx, podName, workDir) { if g.dirExists(ctx, podName, workDir) {
g.logger.Info("workspace exists but is not a git repo, clearing", log.Info("workspace exists but is not a git repo, clearing",
"pod", podName, logging.FieldPodName, podName,
"workDir", workDir, "workDir", workDir,
) )
// Clear the directory contents (but keep the directory itself) // Clear the directory contents (but keep the directory itself)
@ -140,9 +135,9 @@ func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, clon
} }
cmd := exec.CommandContext(ctx, "kubectl", clearArgs...) cmd := exec.CommandContext(ctx, "kubectl", clearArgs...)
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
g.logger.Warn("failed to clear workspace, attempting clone anyway", log.Warn("failed to clear workspace, attempting clone anyway",
"pod", podName, logging.FieldPodName, podName,
"error", err, logging.FieldError, err,
) )
} }
} }
@ -155,8 +150,8 @@ func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, clon
authCloneURL = strings.Replace(cloneURL, "https://", "https://token:"+g.giteaToken+"@", 1) authCloneURL = strings.Replace(cloneURL, "https://", "https://token:"+g.giteaToken+"@", 1)
} }
g.logger.Info("cloning repository", log.Info("cloning repository",
"pod", podName, logging.FieldPodName, podName,
"workDir", workDir, "workDir", workDir,
"url", cloneURL, // Log without token "url", cloneURL, // Log without token
) )
@ -178,8 +173,8 @@ func (g *PodGitOperations) CloneRepo(ctx context.Context, podName, workDir, clon
} }
result.Cloned = true result.Cloned = true
g.logger.Info("repository cloned successfully", log.Info("repository cloned successfully",
"pod", podName, logging.FieldPodName, podName,
"workDir", workDir, "workDir", workDir,
) )
@ -205,6 +200,7 @@ func (g *PodGitOperations) dirExists(ctx context.Context, podName, path string)
// //
// This is the programmatic alternative to relying on LLMs for git operations. // This is the programmatic alternative to relying on LLMs for git operations.
func (g *PodGitOperations) CommitAndPush(ctx context.Context, podName, workDir, message string, push bool) *PostBuildResult { func (g *PodGitOperations) CommitAndPush(ctx context.Context, podName, workDir, message string, push bool) *PostBuildResult {
log := logging.FromContext(ctx).WithWorker("pod-git-ops")
result := &PostBuildResult{} result := &PostBuildResult{}
// Configure git user for commits // Configure git user for commits
@ -224,7 +220,7 @@ func (g *PodGitOperations) CommitAndPush(ctx context.Context, podName, workDir,
return result return result
} }
if strings.TrimSpace(status) == "" { if strings.TrimSpace(status) == "" {
g.logger.Info("no changes to commit", "pod", podName, "workDir", workDir) log.Info("no changes to commit", logging.FieldPodName, podName, "workDir", workDir)
return result return result
} }
result.HasChanges = true result.HasChanges = true
@ -261,8 +257,8 @@ func (g *PodGitOperations) CommitAndPush(ctx context.Context, podName, workDir,
} }
result.CommitSHA = strings.TrimSpace(sha) result.CommitSHA = strings.TrimSpace(sha)
g.logger.Info("committed changes", log.Info("committed changes",
"pod", podName, logging.FieldPodName, podName,
"sha", result.CommitSHA, "sha", result.CommitSHA,
"files", len(result.FilesChanged), "files", len(result.FilesChanged),
) )
@ -275,7 +271,7 @@ func (g *PodGitOperations) CommitAndPush(ctx context.Context, podName, workDir,
// This avoids putting the token in the URL which would be visible in logs // This avoids putting the token in the URL which would be visible in logs
credHelper := fmt.Sprintf("!f() { echo username=token; echo password=%s; }; f", g.giteaToken) credHelper := fmt.Sprintf("!f() { echo username=token; echo password=%s; }; f", g.giteaToken)
if err := g.runGitInPod(ctx, podName, workDir, "config", "credential.helper", credHelper); err != nil { if err := g.runGitInPod(ctx, podName, workDir, "config", "credential.helper", credHelper); err != nil {
g.logger.Warn("failed to configure credential helper", "error", err) log.Warn("failed to configure credential helper", logging.FieldError, err)
// Continue anyway - push might still work if pod has other auth configured // Continue anyway - push might still work if pod has other auth configured
} }
} }
@ -285,7 +281,7 @@ func (g *PodGitOperations) CommitAndPush(ctx context.Context, podName, workDir,
return result return result
} }
result.Pushed = true result.Pushed = true
g.logger.Info("pushed changes", "pod", podName, "sha", result.CommitSHA) log.Info("pushed changes", logging.FieldPodName, podName, "sha", result.CommitSHA)
} }
return result return result

View File

@ -2,11 +2,11 @@ package worker
import ( import (
"context" "context"
"log/slog"
"sync" "sync"
"time" "time"
"github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/port"
) )
@ -18,7 +18,6 @@ type QueueMaintenance struct {
queue port.WorkQueue queue port.WorkQueue
registry port.WorkerRegistry registry port.WorkerRegistry
buildAudit port.BuildAudit // Optional: syncs build audit when requeuing stale tasks buildAudit port.BuildAudit // Optional: syncs build audit when requeuing stale tasks
logger *slog.Logger
// Intervals // Intervals
staleTaskTimeout time.Duration staleTaskTimeout time.Duration
@ -57,8 +56,6 @@ type QueueMaintenanceConfig struct {
// BuildAudit syncs build audit status when requeuing stale tasks. // BuildAudit syncs build audit status when requeuing stale tasks.
// If nil, build audit is not updated (legacy behavior). // If nil, build audit is not updated (legacy behavior).
BuildAudit port.BuildAudit BuildAudit port.BuildAudit
Logger *slog.Logger
} }
// DefaultQueueMaintenanceConfig returns sensible defaults. // DefaultQueueMaintenanceConfig returns sensible defaults.
@ -69,7 +66,6 @@ func DefaultQueueMaintenanceConfig() *QueueMaintenanceConfig {
CleanupAge: 7 * 24 * time.Hour, CleanupAge: 7 * 24 * time.Hour,
MaintenancePeriod: 1 * time.Minute, MaintenancePeriod: 1 * time.Minute,
MetricsPeriod: 15 * time.Second, MetricsPeriod: 15 * time.Second,
Logger: slog.Default(),
} }
} }
@ -89,7 +85,6 @@ func NewQueueMaintenance(
queue: queue, queue: queue,
registry: registry, registry: registry,
buildAudit: cfg.BuildAudit, buildAudit: cfg.BuildAudit,
logger: cfg.Logger.With("component", "queue-maintenance"),
staleTaskTimeout: cfg.StaleTaskTimeout, staleTaskTimeout: cfg.StaleTaskTimeout,
staleWorkerTimeout: cfg.StaleWorkerTimeout, staleWorkerTimeout: cfg.StaleWorkerTimeout,
cleanupAge: cfg.CleanupAge, cleanupAge: cfg.CleanupAge,
@ -102,7 +97,8 @@ func NewQueueMaintenance(
// Start begins the maintenance and metrics loops. // Start begins the maintenance and metrics loops.
func (m *QueueMaintenance) Start() { func (m *QueueMaintenance) Start() {
m.logger.Info("queue maintenance started", log := logging.FromContext(m.ctx).WithWorker("queue-maintenance")
log.Info("queue maintenance started",
"maintenance_period", m.maintenancePeriod, "maintenance_period", m.maintenancePeriod,
"metrics_period", m.metricsPeriod, "metrics_period", m.metricsPeriod,
"stale_task_timeout", m.staleTaskTimeout, "stale_task_timeout", m.staleTaskTimeout,
@ -117,10 +113,11 @@ func (m *QueueMaintenance) Start() {
// Stop gracefully shuts down the maintenance worker. // Stop gracefully shuts down the maintenance worker.
func (m *QueueMaintenance) Stop() { func (m *QueueMaintenance) Stop() {
m.logger.Info("queue maintenance stopping") log := logging.FromContext(m.ctx).WithWorker("queue-maintenance")
log.Info("queue maintenance stopping")
m.cancel() m.cancel()
m.wg.Wait() m.wg.Wait()
m.logger.Info("queue maintenance stopped") log.Info("queue maintenance stopped")
} }
// maintenanceLoop runs periodic maintenance: stale recovery, worker health, cleanup. // maintenanceLoop runs periodic maintenance: stale recovery, worker health, cleanup.
@ -177,26 +174,28 @@ func (m *QueueMaintenance) runMaintenance() {
// (the worker likely crashed without reporting). // (the worker likely crashed without reporting).
// Also syncs build audit to pending status if configured. // Also syncs build audit to pending status if configured.
func (m *QueueMaintenance) requeueStaleTasks(ctx context.Context) { func (m *QueueMaintenance) requeueStaleTasks(ctx context.Context) {
log := logging.FromContext(ctx).WithWorker("queue-maintenance")
// Use RequeueStaleWithIDs to get task IDs for build audit sync // Use RequeueStaleWithIDs to get task IDs for build audit sync
taskIDs, err := m.queue.RequeueStaleWithIDs(ctx, m.staleTaskTimeout) taskIDs, err := m.queue.RequeueStaleWithIDs(ctx, m.staleTaskTimeout)
if err != nil { if err != nil {
m.logger.Warn("failed to requeue stale tasks", "error", err) log.Warn("failed to requeue stale tasks", logging.FieldError, err)
return return
} }
if len(taskIDs) == 0 { if len(taskIDs) == 0 {
return return
} }
m.logger.Info("requeued stale tasks", "count", len(taskIDs), "task_ids", taskIDs) log.Info("requeued stale tasks", "count", len(taskIDs), "task_ids", taskIDs)
// Sync build audit status if configured // Sync build audit status if configured
if m.buildAudit != nil { if m.buildAudit != nil {
for _, taskID := range taskIDs { for _, taskID := range taskIDs {
// Update build audit to pending (worker assignment cleared) // Update build audit to pending (worker assignment cleared)
if err := m.buildAudit.UpdateStatus(ctx, taskID, domain.BuildStatusPending, ""); err != nil { if err := m.buildAudit.UpdateStatus(ctx, taskID, domain.BuildStatusPending, ""); err != nil {
m.logger.Warn("failed to sync build audit for requeued task", log.Warn("failed to sync build audit for requeued task",
"task_id", taskID, "task_id", taskID,
"error", err, logging.FieldError, err,
) )
} }
} }
@ -205,25 +204,29 @@ func (m *QueueMaintenance) requeueStaleTasks(ctx context.Context) {
// markStaleWorkers marks workers without recent heartbeats as offline. // markStaleWorkers marks workers without recent heartbeats as offline.
func (m *QueueMaintenance) markStaleWorkers(ctx context.Context) { func (m *QueueMaintenance) markStaleWorkers(ctx context.Context) {
log := logging.FromContext(ctx).WithWorker("queue-maintenance")
count, err := m.registry.MarkStaleOffline(ctx, m.staleWorkerTimeout) count, err := m.registry.MarkStaleOffline(ctx, m.staleWorkerTimeout)
if err != nil { if err != nil {
m.logger.Warn("failed to mark stale workers offline", "error", err) log.Warn("failed to mark stale workers offline", logging.FieldError, err)
return return
} }
if count > 0 { if count > 0 {
m.logger.Info("marked stale workers offline", "count", count) log.Info("marked stale workers offline", "count", count)
} }
} }
// cleanupOldTasks removes completed/failed/cancelled tasks older than cleanup age. // cleanupOldTasks removes completed/failed/cancelled tasks older than cleanup age.
func (m *QueueMaintenance) cleanupOldTasks(ctx context.Context) { func (m *QueueMaintenance) cleanupOldTasks(ctx context.Context) {
log := logging.FromContext(ctx).WithWorker("queue-maintenance")
count, err := m.queue.CleanupOld(ctx, m.cleanupAge) count, err := m.queue.CleanupOld(ctx, m.cleanupAge)
if err != nil { if err != nil {
m.logger.Warn("failed to cleanup old tasks", "error", err) log.Warn("failed to cleanup old tasks", logging.FieldError, err)
return return
} }
if count > 0 { if count > 0 {
m.logger.Info("cleaned up old tasks", "count", count) log.Info("cleaned up old tasks", "count", count)
} }
} }
@ -232,9 +235,11 @@ func (m *QueueMaintenance) refreshMetrics() {
ctx, cancel := context.WithTimeout(m.ctx, TimeoutQuickOp) ctx, cancel := context.WithTimeout(m.ctx, TimeoutQuickOp)
defer cancel() defer cancel()
log := logging.FromContext(ctx).WithWorker("queue-maintenance")
stats, err := m.queue.GetStats(ctx) stats, err := m.queue.GetStats(ctx)
if err != nil { if err != nil {
m.logger.Warn("failed to get queue stats for metrics", "error", err) log.Warn("failed to get queue stats for metrics", logging.FieldError, err)
return return
} }
@ -247,7 +252,7 @@ func (m *QueueMaintenance) refreshMetrics() {
// Worker counts // Worker counts
workers, err := m.registry.List(ctx, port.WorkerFilter{}) workers, err := m.registry.List(ctx, port.WorkerFilter{})
if err != nil { if err != nil {
m.logger.Warn("failed to list workers for metrics", "error", err) log.Warn("failed to list workers for metrics", logging.FieldError, err)
return return
} }

View File

@ -3,7 +3,6 @@ package worker
import ( import (
"context" "context"
"fmt" "fmt"
"log/slog"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -212,7 +211,6 @@ func TestQueueMaintenance_RunMaintenance(t *testing.T) {
CleanupAge: 7 * 24 * time.Hour, CleanupAge: 7 * 24 * time.Hour,
MaintenancePeriod: 1 * time.Hour, // won't fire in test MaintenancePeriod: 1 * time.Hour, // won't fire in test
MetricsPeriod: 1 * time.Hour, // won't fire in test MetricsPeriod: 1 * time.Hour, // won't fire in test
Logger: slog.Default(),
} }
m := NewQueueMaintenance(queue, registry, cfg) m := NewQueueMaintenance(queue, registry, cfg)
@ -246,7 +244,6 @@ func TestQueueMaintenance_RefreshMetrics(t *testing.T) {
CleanupAge: 7 * 24 * time.Hour, CleanupAge: 7 * 24 * time.Hour,
MaintenancePeriod: 1 * time.Hour, MaintenancePeriod: 1 * time.Hour,
MetricsPeriod: 1 * time.Hour, MetricsPeriod: 1 * time.Hour,
Logger: slog.Default(),
} }
m := NewQueueMaintenance(queue, registry, cfg) m := NewQueueMaintenance(queue, registry, cfg)
@ -271,7 +268,6 @@ func TestQueueMaintenance_StartStop(t *testing.T) {
CleanupAge: 7 * 24 * time.Hour, CleanupAge: 7 * 24 * time.Hour,
MaintenancePeriod: 50 * time.Millisecond, MaintenancePeriod: 50 * time.Millisecond,
MetricsPeriod: 50 * time.Millisecond, MetricsPeriod: 50 * time.Millisecond,
Logger: slog.Default(),
} }
m := NewQueueMaintenance(queue, registry, cfg) m := NewQueueMaintenance(queue, registry, cfg)

View File

@ -4,12 +4,12 @@ package worker
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"log/slog"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/port"
) )
@ -20,7 +20,6 @@ type QueueProcessor struct {
projects port.ProjectRepository projects port.ProjectRepository
streams port.StreamPublisher streams port.StreamPublisher
webhookDispatcher port.WebhookDispatcher webhookDispatcher port.WebhookDispatcher
logger *slog.Logger
pollPeriod time.Duration pollPeriod time.Duration
// Shutdown management // Shutdown management
@ -36,14 +35,12 @@ type QueueProcessor struct {
// QueueProcessorConfig holds configuration for the queue processor. // QueueProcessorConfig holds configuration for the queue processor.
type QueueProcessorConfig struct { type QueueProcessorConfig struct {
PollPeriod time.Duration PollPeriod time.Duration
Logger *slog.Logger
} }
// DefaultQueueProcessorConfig returns sensible defaults. // DefaultQueueProcessorConfig returns sensible defaults.
func DefaultQueueProcessorConfig() *QueueProcessorConfig { func DefaultQueueProcessorConfig() *QueueProcessorConfig {
return &QueueProcessorConfig{ return &QueueProcessorConfig{
PollPeriod: 5 * time.Second, PollPeriod: 5 * time.Second,
Logger: slog.Default(),
} }
} }
@ -66,7 +63,6 @@ func NewQueueProcessor(
executor: executor, executor: executor,
projects: projects, projects: projects,
streams: streams, streams: streams,
logger: cfg.Logger,
pollPeriod: cfg.PollPeriod, pollPeriod: cfg.PollPeriod,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -83,7 +79,8 @@ func (p *QueueProcessor) WithWebhookDispatcher(dispatcher port.WebhookDispatcher
// Start begins processing the command queue. // Start begins processing the command queue.
// It spawns a worker for each known project. // It spawns a worker for each known project.
func (p *QueueProcessor) Start() error { func (p *QueueProcessor) Start() error {
p.logger.Info("queue processor starting") log := logging.FromContext(p.ctx).WithWorker("queue-processor")
log.Info("queue processor starting")
// Start the main coordinator that manages per-project workers // Start the main coordinator that manages per-project workers
p.wg.Add(1) p.wg.Add(1)
@ -94,16 +91,18 @@ func (p *QueueProcessor) Start() error {
// Stop gracefully shuts down the queue processor. // Stop gracefully shuts down the queue processor.
func (p *QueueProcessor) Stop() { func (p *QueueProcessor) Stop() {
p.logger.Info("queue processor stopping") log := logging.FromContext(p.ctx).WithWorker("queue-processor")
log.Info("queue processor stopping")
p.cancel() p.cancel()
p.wg.Wait() p.wg.Wait()
p.logger.Info("queue processor stopped") log.Info("queue processor stopped")
} }
// coordinator manages per-project workers, starting new ones as projects are discovered. // coordinator manages per-project workers, starting new ones as projects are discovered.
func (p *QueueProcessor) coordinator() { func (p *QueueProcessor) coordinator() {
defer p.wg.Done() defer p.wg.Done()
log := logging.FromContext(p.ctx).WithWorker("queue-processor")
ticker := time.NewTicker(p.pollPeriod) ticker := time.NewTicker(p.pollPeriod)
defer ticker.Stop() defer ticker.Stop()
@ -116,7 +115,7 @@ func (p *QueueProcessor) coordinator() {
// Stop all project workers // Stop all project workers
p.projectMu.Lock() p.projectMu.Lock()
for projectID, cancel := range p.projectWorkers { for projectID, cancel := range p.projectWorkers {
p.logger.Debug("stopping worker", "project", projectID) log.Debug("stopping worker", "project", projectID)
cancel() cancel()
} }
p.projectMu.Unlock() p.projectMu.Unlock()
@ -129,9 +128,11 @@ func (p *QueueProcessor) coordinator() {
// refreshProjectWorkers ensures each known project has a worker. // refreshProjectWorkers ensures each known project has a worker.
func (p *QueueProcessor) refreshProjectWorkers() { func (p *QueueProcessor) refreshProjectWorkers() {
log := logging.FromContext(p.ctx).WithWorker("queue-processor")
projects, err := p.projects.List(p.ctx) projects, err := p.projects.List(p.ctx)
if err != nil { if err != nil {
p.logger.Warn("failed to list projects for queue processing", "error", err) log.Warn("failed to list projects for queue processing", logging.FieldError, err)
return return
} }
@ -146,7 +147,7 @@ func (p *QueueProcessor) refreshProjectWorkers() {
p.projectWorkers[projectID] = workerCancel p.projectWorkers[projectID] = workerCancel
p.wg.Add(1) p.wg.Add(1)
go p.projectWorker(workerCtx, projectID) go p.projectWorker(workerCtx, projectID)
p.logger.Info("started queue worker", "project", projectID) log.Info("started queue worker", "project", projectID)
} }
} }
@ -168,7 +169,7 @@ func (p *QueueProcessor) projectWorker(ctx context.Context, projectID string) {
case <-ticker.C: case <-ticker.C:
// Try to dequeue and process a command // Try to dequeue and process a command
if err := p.processNextCommand(ctx, projectID); err != nil { if err := p.processNextCommand(ctx, projectID); err != nil {
p.logger.Warn("error processing command", "project", projectID, "error", err) logging.FromContext(ctx).WithWorker("queue-processor").Warn("error processing command", "project", projectID, "error", err)
} }
} }
} }
@ -185,7 +186,7 @@ func (p *QueueProcessor) processNextCommand(ctx context.Context, projectID strin
return nil // No commands pending return nil // No commands pending
} }
p.logger.Info("processing queued command", logging.FromContext(ctx).WithWorker("queue-processor").Info("processing queued command",
"command_id", cmd.ID, "command_id", cmd.ID,
"project", projectID, "project", projectID,
"type", cmd.CommandType, "type", cmd.CommandType,
@ -294,7 +295,7 @@ func (p *QueueProcessor) processNextCommand(ctx context.Context, projectID strin
// Update command status // Update command status
if err := p.queue.UpdateStatus(ctx, cmd.ID, finalStatus, queueResult); err != nil { if err := p.queue.UpdateStatus(ctx, cmd.ID, finalStatus, queueResult); err != nil {
p.logger.Warn("failed to update command status", "command_id", cmd.ID, "error", err) logging.FromContext(ctx).WithWorker("queue-processor").Warn("failed to update command status", "command_id", cmd.ID, "error", err)
} }
// Publish completion event // Publish completion event
@ -327,7 +328,7 @@ func (p *QueueProcessor) processNextCommand(ctx context.Context, projectID strin
Error: queueResult.Error, Error: queueResult.Error,
}) })
p.logger.Info("completed queued command", logging.FromContext(ctx).WithWorker("queue-processor").Info("completed queued command",
"command_id", cmd.ID, "command_id", cmd.ID,
"project", projectID, "project", projectID,
"status", finalStatus, "status", finalStatus,
@ -357,7 +358,7 @@ func (p *QueueProcessor) dispatchWebhookEvent(ctx context.Context, projectID str
} }
if err := p.webhookDispatcher.Dispatch(ctx, projectID, event); err != nil { if err := p.webhookDispatcher.Dispatch(ctx, projectID, event); err != nil {
p.logger.Warn("failed to dispatch webhook event", logging.FromContext(ctx).WithWorker("queue-processor").Warn("failed to dispatch webhook event",
"project_id", projectID, "project_id", projectID,
"event_type", eventType, "event_type", eventType,
"error", err, "error", err,

View File

@ -5,12 +5,12 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log/slog"
"os/exec" "os/exec"
"strings" "strings"
"time" "time"
"github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
) )
// SDLCTaskExecutor handles WorkTaskTypeSDLC tasks by executing sdlc CLI commands // SDLCTaskExecutor handles WorkTaskTypeSDLC tasks by executing sdlc CLI commands
@ -19,7 +19,6 @@ import (
type SDLCTaskExecutor struct { type SDLCTaskExecutor struct {
podGitOps *PodGitOperations podGitOps *PodGitOperations
namespace string namespace string
logger *slog.Logger
} }
// SDLCTaskExecutorConfig holds configuration for the SDLC task executor. // SDLCTaskExecutorConfig holds configuration for the SDLC task executor.
@ -29,20 +28,13 @@ type SDLCTaskExecutorConfig struct {
// PodGitOps provides git clone/commit/push operations. // PodGitOps provides git clone/commit/push operations.
PodGitOps *PodGitOperations PodGitOps *PodGitOperations
Logger *slog.Logger
} }
// NewSDLCTaskExecutor creates a new SDLC task executor. // NewSDLCTaskExecutor creates a new SDLC task executor.
func NewSDLCTaskExecutor(cfg SDLCTaskExecutorConfig) *SDLCTaskExecutor { func NewSDLCTaskExecutor(cfg SDLCTaskExecutorConfig) *SDLCTaskExecutor {
logger := cfg.Logger
if logger == nil {
logger = slog.Default()
}
return &SDLCTaskExecutor{ return &SDLCTaskExecutor{
podGitOps: cfg.PodGitOps, podGitOps: cfg.PodGitOps,
namespace: cfg.Namespace, namespace: cfg.Namespace,
logger: logger.With("component", "sdlc-task-executor"),
} }
} }
@ -50,6 +42,7 @@ func NewSDLCTaskExecutor(cfg SDLCTaskExecutorConfig) *SDLCTaskExecutor {
// and optionally committing/pushing changes. // and optionally committing/pushing changes.
func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult { func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult {
start := time.Now() start := time.Now()
log := logging.FromContext(ctx).WithWorker("sdlc-task-executor")
spec, err := e.parseSpec(task.Spec) spec, err := e.parseSpec(task.Spec)
if err != nil { if err != nil {
@ -66,11 +59,11 @@ func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) *
// Working directory in the pod // Working directory in the pod
workDir := "/workspace" workDir := "/workspace"
e.logger.Info("executing SDLC task", log.Info("executing SDLC task",
"task_id", task.ID, "task_id", task.ID,
"project_id", task.ProjectID, logging.FieldProjectID, task.ProjectID,
"command", spec.Command, "command", spec.Command,
"pod", podName, logging.FieldPodName, podName,
) )
// 1. Clone repo to worker pod // 1. Clone repo to worker pod
@ -93,9 +86,9 @@ func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) *
// 2. Ensure .sdlc/ is initialized (auto-init for skeleton projects) // 2. Ensure .sdlc/ is initialized (auto-init for skeleton projects)
if err := e.ensureSDLCInit(ctx, podName, workDir); err != nil { if err := e.ensureSDLCInit(ctx, podName, workDir); err != nil {
e.logger.Warn("sdlc init check failed, continuing anyway", log.Warn("sdlc init check failed, continuing anyway",
"task_id", task.ID, "task_id", task.ID,
"error", err, logging.FieldError, err,
) )
} }
@ -128,7 +121,7 @@ func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) *
if gitResult.HasChanges { if gitResult.HasChanges {
result.CommitSHA = gitResult.CommitSHA result.CommitSHA = gitResult.CommitSHA
result.FilesChanged = gitResult.FilesChanged result.FilesChanged = gitResult.FilesChanged
e.logger.Info("SDLC changes committed", log.Info("SDLC changes committed",
"task_id", task.ID, "task_id", task.ID,
"commit", gitResult.CommitSHA, "commit", gitResult.CommitSHA,
"files", len(gitResult.FilesChanged), "files", len(gitResult.FilesChanged),
@ -137,10 +130,10 @@ func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) *
} }
} }
e.logger.Info("SDLC task completed", log.Info("SDLC task completed",
"task_id", task.ID, "task_id", task.ID,
"command", spec.Command, "command", spec.Command,
"duration_ms", result.DurationMs, logging.FieldDuration, result.DurationMs,
) )
return result return result
@ -149,6 +142,8 @@ func (e *SDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) *
// ensureSDLCInit checks if .sdlc/ exists and runs `sdlc init` if it doesn't. // ensureSDLCInit checks if .sdlc/ exists and runs `sdlc init` if it doesn't.
// This enables SDLC operations on skeleton projects that don't have .sdlc/ pre-initialized. // This enables SDLC operations on skeleton projects that don't have .sdlc/ pre-initialized.
func (e *SDLCTaskExecutor) ensureSDLCInit(ctx context.Context, podName, workDir string) error { func (e *SDLCTaskExecutor) ensureSDLCInit(ctx context.Context, podName, workDir string) error {
log := logging.FromContext(ctx).WithWorker("sdlc-task-executor")
// Check if .sdlc/ directory exists // Check if .sdlc/ directory exists
checkArgs := []string{ checkArgs := []string{
"exec", "-n", e.namespace, podName, "--", "exec", "-n", e.namespace, podName, "--",
@ -169,7 +164,7 @@ func (e *SDLCTaskExecutor) ensureSDLCInit(ctx context.Context, podName, workDir
} }
// Run sdlc init // Run sdlc init
e.logger.Info("initializing .sdlc directory", "pod", podName, "workDir", workDir) log.Info("initializing .sdlc directory", logging.FieldPodName, podName, "workDir", workDir)
initArgs := []string{ initArgs := []string{
"exec", "-n", e.namespace, podName, "--", "exec", "-n", e.namespace, podName, "--",
@ -186,7 +181,7 @@ func (e *SDLCTaskExecutor) ensureSDLCInit(ctx context.Context, podName, workDir
return fmt.Errorf("sdlc init failed: %w: %s", err, initStderr.String()) return fmt.Errorf("sdlc init failed: %w: %s", err, initStderr.String())
} }
e.logger.Info("sdlc initialized", "pod", podName, "output", initStdout.String()) log.Info("sdlc initialized", logging.FieldPodName, podName, "output", initStdout.String())
return nil return nil
} }

View File

@ -4,11 +4,11 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log/slog"
"strings" "strings"
"time" "time"
"github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/port"
) )
@ -28,7 +28,6 @@ const (
type VerifyExecutor struct { type VerifyExecutor struct {
cmdExecutor port.CommandExecutor // kubectl exec wrapper cmdExecutor port.CommandExecutor // kubectl exec wrapper
streams port.StreamPublisher // SSE stream publisher for real-time events streams port.StreamPublisher // SSE stream publisher for real-time events
logger *slog.Logger
namespace string // Kubernetes namespace for the pod namespace string // Kubernetes namespace for the pod
podName string // Playwright pod name (e.g., "playwright-0") podName string // Playwright pod name (e.g., "playwright-0")
} }
@ -43,12 +42,8 @@ type VerifyExecutorConfig struct {
func NewVerifyExecutor( func NewVerifyExecutor(
cmdExecutor port.CommandExecutor, cmdExecutor port.CommandExecutor,
streams port.StreamPublisher, streams port.StreamPublisher,
logger *slog.Logger,
cfg *VerifyExecutorConfig, cfg *VerifyExecutorConfig,
) *VerifyExecutor { ) *VerifyExecutor {
if logger == nil {
logger = slog.Default()
}
if cfg == nil { if cfg == nil {
cfg = &VerifyExecutorConfig{ cfg = &VerifyExecutorConfig{
Namespace: "rdev", Namespace: "rdev",
@ -64,7 +59,6 @@ func NewVerifyExecutor(
return &VerifyExecutor{ return &VerifyExecutor{
cmdExecutor: cmdExecutor, cmdExecutor: cmdExecutor,
streams: streams, streams: streams,
logger: logger.With("component", "verify-executor"),
namespace: cfg.Namespace, namespace: cfg.Namespace,
podName: cfg.PodName, podName: cfg.PodName,
} }
@ -72,6 +66,7 @@ func NewVerifyExecutor(
// Execute runs a verify task by capturing screenshots/video of a URL. // Execute runs a verify task by capturing screenshots/video of a URL.
func (v *VerifyExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult { func (v *VerifyExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult {
log := logging.FromContext(ctx).WithWorker("verify-executor")
start := time.Now() start := time.Now()
streamID := task.ID // Use task ID as stream ID for SSE streamID := task.ID // Use task ID as stream ID for SSE
@ -109,7 +104,7 @@ func (v *VerifyExecutor) Execute(ctx context.Context, task *domain.WorkTask) *do
"viewports": spec.Viewports, "viewports": spec.Viewports,
}) })
v.logger.Info("executing verify capture", log.Info("executing verify capture",
"task_id", task.ID, "task_id", task.ID,
"project_id", task.ProjectID, "project_id", task.ProjectID,
"url", spec.URL, "url", spec.URL,

View File

@ -16,7 +16,7 @@ func TestVerifyExecutor_Execute_Success(t *testing.T) {
{Stream: "stdout", Line: `{"screenshots":{"1920x1080":"/captures/task-1/1920_1080.png","375x667":"/captures/task-1/375_667.png"},"video":"/captures/task-1/recording.webm"}`, Timestamp: time.Now()}, {Stream: "stdout", Line: `{"screenshots":{"1920x1080":"/captures/task-1/1920_1080.png","375x667":"/captures/task-1/375_667.png"},"video":"/captures/task-1/recording.webm"}`, Timestamp: time.Now()},
} }
exec := NewVerifyExecutor(cmdExec, nil, nil, nil) exec := NewVerifyExecutor(cmdExec, nil, nil)
task := &domain.WorkTask{ task := &domain.WorkTask{
ID: "task-1", ID: "task-1",
@ -51,7 +51,7 @@ func TestVerifyExecutor_Execute_Success(t *testing.T) {
func TestVerifyExecutor_Execute_URLRequired(t *testing.T) { func TestVerifyExecutor_Execute_URLRequired(t *testing.T) {
cmdExec := newMockCommandExecutor() cmdExec := newMockCommandExecutor()
exec := NewVerifyExecutor(cmdExec, nil, nil, nil) exec := NewVerifyExecutor(cmdExec, nil, nil)
task := &domain.WorkTask{ task := &domain.WorkTask{
ID: "task-1", ID: "task-1",
@ -72,7 +72,7 @@ func TestVerifyExecutor_Execute_URLRequired(t *testing.T) {
func TestVerifyExecutor_Execute_InvalidURL(t *testing.T) { func TestVerifyExecutor_Execute_InvalidURL(t *testing.T) {
cmdExec := newMockCommandExecutor() cmdExec := newMockCommandExecutor()
exec := NewVerifyExecutor(cmdExec, nil, nil, nil) exec := NewVerifyExecutor(cmdExec, nil, nil)
task := &domain.WorkTask{ task := &domain.WorkTask{
ID: "task-1", ID: "task-1",
@ -94,7 +94,7 @@ func TestVerifyExecutor_Execute_CaptureFailure(t *testing.T) {
cmdExec := newMockCommandExecutor() cmdExec := newMockCommandExecutor()
cmdExec.err = fmt.Errorf("kubectl exec failed: connection refused") cmdExec.err = fmt.Errorf("kubectl exec failed: connection refused")
exec := NewVerifyExecutor(cmdExec, nil, nil, nil) exec := NewVerifyExecutor(cmdExec, nil, nil)
task := &domain.WorkTask{ task := &domain.WorkTask{
ID: "task-1", ID: "task-1",
@ -122,7 +122,7 @@ func TestVerifyExecutor_Execute_NonZeroExitCode(t *testing.T) {
DurationMs: 100, DurationMs: 100,
} }
exec := NewVerifyExecutor(cmdExec, nil, nil, nil) exec := NewVerifyExecutor(cmdExec, nil, nil)
task := &domain.WorkTask{ task := &domain.WorkTask{
ID: "task-1", ID: "task-1",
@ -146,7 +146,7 @@ func TestVerifyExecutor_Execute_InvalidManifestJSON(t *testing.T) {
{Stream: "stdout", Line: "not valid json", Timestamp: time.Now()}, {Stream: "stdout", Line: "not valid json", Timestamp: time.Now()},
} }
exec := NewVerifyExecutor(cmdExec, nil, nil, nil) exec := NewVerifyExecutor(cmdExec, nil, nil)
task := &domain.WorkTask{ task := &domain.WorkTask{
ID: "task-1", ID: "task-1",
@ -165,7 +165,7 @@ func TestVerifyExecutor_Execute_InvalidManifestJSON(t *testing.T) {
} }
func TestVerifyExecutor_ParseSpec(t *testing.T) { func TestVerifyExecutor_ParseSpec(t *testing.T) {
exec := NewVerifyExecutor(nil, nil, nil, nil) exec := NewVerifyExecutor(nil, nil, nil)
t.Run("valid spec with all fields", func(t *testing.T) { t.Run("valid spec with all fields", func(t *testing.T) {
spec, err := exec.parseSpec(map[string]any{ spec, err := exec.parseSpec(map[string]any{
@ -238,7 +238,7 @@ func TestVerifyExecutor_ParseSpec(t *testing.T) {
} }
func TestVerifyExecutor_BuildCaptureCommand(t *testing.T) { func TestVerifyExecutor_BuildCaptureCommand(t *testing.T) {
exec := NewVerifyExecutor(nil, nil, nil, nil) exec := NewVerifyExecutor(nil, nil, nil)
spec := &domain.VerifySpec{ spec := &domain.VerifySpec{
URL: "https://example.com/page", URL: "https://example.com/page",
@ -309,7 +309,7 @@ func TestVerifyExecutor_BuildCaptureCommand(t *testing.T) {
func TestVerifyExecutor_Config(t *testing.T) { func TestVerifyExecutor_Config(t *testing.T) {
t.Run("default config", func(t *testing.T) { t.Run("default config", func(t *testing.T) {
exec := NewVerifyExecutor(nil, nil, nil, nil) exec := NewVerifyExecutor(nil, nil, nil)
if exec.namespace != "rdev" { if exec.namespace != "rdev" {
t.Errorf("namespace = %q, want 'rdev'", exec.namespace) t.Errorf("namespace = %q, want 'rdev'", exec.namespace)
} }
@ -319,7 +319,7 @@ func TestVerifyExecutor_Config(t *testing.T) {
}) })
t.Run("custom config", func(t *testing.T) { t.Run("custom config", func(t *testing.T) {
exec := NewVerifyExecutor(nil, nil, nil, &VerifyExecutorConfig{ exec := NewVerifyExecutor(nil, nil, &VerifyExecutorConfig{
Namespace: "custom-ns", Namespace: "custom-ns",
PodName: "custom-pod-0", PodName: "custom-pod-0",
}) })

View File

@ -4,13 +4,13 @@ package worker
import ( import (
"context" "context"
"fmt" "fmt"
"log/slog"
"os" "os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/service" "github.com/orchard9/rdev/internal/service"
) )
@ -24,7 +24,6 @@ type WorkExecutor struct {
buildExec *BuildExecutor buildExec *BuildExecutor
verifyExec *VerifyExecutor verifyExec *VerifyExecutor
sdlcExec *SDLCTaskExecutor sdlcExec *SDLCTaskExecutor
logger *slog.Logger
workerID string workerID string
hostname string hostname string
@ -61,8 +60,6 @@ type WorkExecutorConfig struct {
// TaskTimeout is the maximum time a single task may run. // TaskTimeout is the maximum time a single task may run.
// Default: 15 minutes. // Default: 15 minutes.
TaskTimeout time.Duration TaskTimeout time.Duration
Logger *slog.Logger
} }
// DefaultWorkExecutorConfig returns sensible defaults. // DefaultWorkExecutorConfig returns sensible defaults.
@ -77,7 +74,6 @@ func DefaultWorkExecutorConfig() *WorkExecutorConfig {
PollPeriod: 5 * time.Second, PollPeriod: 5 * time.Second,
HeartbeatPeriod: 30 * time.Second, HeartbeatPeriod: 30 * time.Second,
TaskTimeout: 15 * time.Minute, TaskTimeout: 15 * time.Minute,
Logger: slog.Default(),
} }
} }
@ -117,7 +113,6 @@ func NewWorkExecutor(
buildExec: buildExec, buildExec: buildExec,
verifyExec: verifyExec, verifyExec: verifyExec,
sdlcExec: sdlcExec, sdlcExec: sdlcExec,
logger: cfg.Logger.With("component", "work-executor"),
workerID: cfg.WorkerID, workerID: cfg.WorkerID,
hostname: hostname, hostname: hostname,
version: cfg.Version, version: cfg.Version,
@ -136,6 +131,8 @@ func (e *WorkExecutor) Start() error {
return fmt.Errorf("executor already started") return fmt.Errorf("executor already started")
} }
log := logging.FromContext(e.ctx).WithWorker("work-executor")
// Register this worker in the pool // Register this worker in the pool
worker := &domain.Worker{ worker := &domain.Worker{
ID: e.workerID, ID: e.workerID,
@ -147,7 +144,7 @@ func (e *WorkExecutor) Start() error {
return err return err
} }
e.logger.Info("work executor started", log.Info("work executor started",
"worker_id", e.workerID, "worker_id", e.workerID,
"poll_period", e.pollPeriod, "poll_period", e.pollPeriod,
"heartbeat_period", e.hbPeriod, "heartbeat_period", e.hbPeriod,
@ -166,7 +163,8 @@ func (e *WorkExecutor) Start() error {
// Stop gracefully shuts down the executor. // Stop gracefully shuts down the executor.
func (e *WorkExecutor) Stop() { func (e *WorkExecutor) Stop() {
e.logger.Info("work executor stopping", "worker_id", e.workerID) log := logging.FromContext(e.ctx).WithWorker("work-executor")
log.Info("work executor stopping", "worker_id", e.workerID)
e.cancel() e.cancel()
e.wg.Wait() e.wg.Wait()
@ -174,10 +172,10 @@ func (e *WorkExecutor) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), TimeoutQuickOp) ctx, cancel := context.WithTimeout(context.Background(), TimeoutQuickOp)
defer cancel() defer cancel()
if err := e.workerSvc.Deregister(ctx, e.workerID); err != nil { if err := e.workerSvc.Deregister(ctx, e.workerID); err != nil {
e.logger.Warn("failed to deregister worker", "error", err) logging.FromContext(ctx).WithWorker("work-executor").Warn("failed to deregister worker", "error", err)
} }
e.logger.Info("work executor stopped", "worker_id", e.workerID) log.Info("work executor stopped", "worker_id", e.workerID)
} }
// WorkerID returns the executor's worker ID. // WorkerID returns the executor's worker ID.
@ -194,6 +192,7 @@ func (e *WorkExecutor) Running() bool {
func (e *WorkExecutor) heartbeatLoop() { func (e *WorkExecutor) heartbeatLoop() {
defer e.wg.Done() defer e.wg.Done()
log := logging.FromContext(e.ctx).WithWorker("work-executor")
ticker := time.NewTicker(e.hbPeriod) ticker := time.NewTicker(e.hbPeriod)
defer ticker.Stop() defer ticker.Stop()
@ -203,7 +202,7 @@ func (e *WorkExecutor) heartbeatLoop() {
return return
case <-ticker.C: case <-ticker.C:
if err := e.workerSvc.Heartbeat(e.ctx, e.workerID); err != nil { if err := e.workerSvc.Heartbeat(e.ctx, e.workerID); err != nil {
e.logger.Warn("heartbeat failed", "error", err) log.Warn("heartbeat failed", "error", err)
} }
} }
} }
@ -228,16 +227,18 @@ func (e *WorkExecutor) pollLoop() {
// tryClaimAndExecute attempts to claim a task and execute it. // tryClaimAndExecute attempts to claim a task and execute it.
func (e *WorkExecutor) tryClaimAndExecute() { func (e *WorkExecutor) tryClaimAndExecute() {
log := logging.FromContext(e.ctx).WithWorker("work-executor")
task, err := e.workerSvc.ClaimTask(e.ctx, e.workerID) task, err := e.workerSvc.ClaimTask(e.ctx, e.workerID)
if err != nil { if err != nil {
e.logger.Warn("failed to claim task", "error", err) log.Warn("failed to claim task", "error", err)
return return
} }
if task == nil { if task == nil {
return // No tasks available return // No tasks available
} }
e.logger.Info("executing task", log.Info("executing task",
"task_id", task.ID, "task_id", task.ID,
"project_id", task.ProjectID, "project_id", task.ProjectID,
"type", task.Type, "type", task.Type,
@ -257,7 +258,7 @@ func (e *WorkExecutor) tryClaimAndExecute() {
if result.Success { if result.Success {
if err := e.workerSvc.CompleteTask(e.ctx, e.workerID, task.ID, result); err != nil { if err := e.workerSvc.CompleteTask(e.ctx, e.workerID, task.ID, result); err != nil {
e.logger.Error("failed to complete task", log.Error("failed to complete task",
"task_id", task.ID, "task_id", task.ID,
"error", err, "error", err,
) )
@ -269,7 +270,7 @@ func (e *WorkExecutor) tryClaimAndExecute() {
} }
if err := e.workerSvc.FailTask(e.ctx, e.workerID, task.ID, result, e.workSvc); err != nil { if err := e.workerSvc.FailTask(e.ctx, e.workerID, task.ID, result, e.workSvc); err != nil {
e.logger.Error("failed to record task failure", log.Error("failed to record task failure",
"task_id", task.ID, "task_id", task.ID,
"error", err, "error", err,
) )

View File

@ -3,21 +3,12 @@ package worker
import ( import (
"context" "context"
"fmt" "fmt"
"log/slog"
"testing" "testing"
"time" "time"
"github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/domain"
) )
// =============================================================================
// WorkExecutor Tests
// =============================================================================
func testLogger() *slog.Logger {
return slog.Default()
}
func TestWorkExecutor_StartAndStop(t *testing.T) { func TestWorkExecutor_StartAndStop(t *testing.T) {
deps := newTestDeps() deps := newTestDeps()
@ -25,7 +16,6 @@ func TestWorkExecutor_StartAndStop(t *testing.T) {
WorkerID: "test-worker-1", WorkerID: "test-worker-1",
PollPeriod: 100 * time.Millisecond, PollPeriod: 100 * time.Millisecond,
HeartbeatPeriod: 100 * time.Millisecond, HeartbeatPeriod: 100 * time.Millisecond,
Logger: testLogger(),
}) })
if err := executor.Start(); err != nil { if err := executor.Start(); err != nil {
@ -79,7 +69,6 @@ func TestWorkExecutor_ClaimsAndExecutesTask(t *testing.T) {
WorkerID: "test-worker-2", WorkerID: "test-worker-2",
PollPeriod: 50 * time.Millisecond, PollPeriod: 50 * time.Millisecond,
HeartbeatPeriod: 5 * time.Second, HeartbeatPeriod: 5 * time.Second,
Logger: testLogger(),
}) })
// Register the worker (normally done by Start) then call tryClaimAndExecute directly // Register the worker (normally done by Start) then call tryClaimAndExecute directly
@ -122,7 +111,6 @@ func TestWorkExecutor_FailsTaskOnAgentError(t *testing.T) {
WorkerID: "test-worker-3", WorkerID: "test-worker-3",
PollPeriod: 50 * time.Millisecond, PollPeriod: 50 * time.Millisecond,
HeartbeatPeriod: 5 * time.Second, HeartbeatPeriod: 5 * time.Second,
Logger: testLogger(),
}) })
if err := executor.Start(); err != nil { if err := executor.Start(); err != nil {
@ -168,7 +156,6 @@ func TestWorkExecutor_UnsupportedTaskType(t *testing.T) {
WorkerID: "test-worker-4", WorkerID: "test-worker-4",
PollPeriod: 50 * time.Millisecond, PollPeriod: 50 * time.Millisecond,
HeartbeatPeriod: 5 * time.Second, HeartbeatPeriod: 5 * time.Second,
Logger: testLogger(),
}) })
if err := executor.Start(); err != nil { if err := executor.Start(); err != nil {
@ -200,7 +187,7 @@ func TestBuildExecutor_Execute(t *testing.T) {
result: &domain.AgentResult{ExitCode: 0, DurationMs: 500}, result: &domain.AgentResult{ExitCode: 0, DurationMs: 500},
} }
registry := &mockCodeAgentRegistry{agent: agent} registry := &mockCodeAgentRegistry{agent: agent}
exec := NewBuildExecutor(registry, nil, nil, nil, nil) exec := NewBuildExecutor(registry, nil, nil, nil)
task := &domain.WorkTask{ task := &domain.WorkTask{
ID: "task-1", ID: "task-1",
@ -220,7 +207,7 @@ func TestBuildExecutor_Execute(t *testing.T) {
t.Run("missing prompt", func(t *testing.T) { t.Run("missing prompt", func(t *testing.T) {
registry := &mockCodeAgentRegistry{agent: &mockCodeAgent{}} registry := &mockCodeAgentRegistry{agent: &mockCodeAgent{}}
exec := NewBuildExecutor(registry, nil, nil, nil, nil) exec := NewBuildExecutor(registry, nil, nil, nil)
task := &domain.WorkTask{ task := &domain.WorkTask{
ID: "task-1", ID: "task-1",
@ -236,7 +223,7 @@ func TestBuildExecutor_Execute(t *testing.T) {
t.Run("no agent available", func(t *testing.T) { t.Run("no agent available", func(t *testing.T) {
registry := &mockCodeAgentRegistry{agent: nil} registry := &mockCodeAgentRegistry{agent: nil}
exec := NewBuildExecutor(registry, nil, nil, nil, nil) exec := NewBuildExecutor(registry, nil, nil, nil)
task := &domain.WorkTask{ task := &domain.WorkTask{
ID: "task-1", ID: "task-1",
@ -253,7 +240,7 @@ func TestBuildExecutor_Execute(t *testing.T) {
t.Run("agent execution error", func(t *testing.T) { t.Run("agent execution error", func(t *testing.T) {
agent := &mockCodeAgent{err: fmt.Errorf("connection refused")} agent := &mockCodeAgent{err: fmt.Errorf("connection refused")}
registry := &mockCodeAgentRegistry{agent: agent} registry := &mockCodeAgentRegistry{agent: agent}
exec := NewBuildExecutor(registry, nil, nil, nil, nil) exec := NewBuildExecutor(registry, nil, nil, nil)
task := &domain.WorkTask{ task := &domain.WorkTask{
ID: "task-1", ID: "task-1",
@ -275,7 +262,7 @@ func TestBuildExecutor_Execute(t *testing.T) {
result: &domain.AgentResult{ExitCode: 1, DurationMs: 500}, result: &domain.AgentResult{ExitCode: 1, DurationMs: 500},
} }
registry := &mockCodeAgentRegistry{agent: agent} registry := &mockCodeAgentRegistry{agent: agent}
exec := NewBuildExecutor(registry, nil, nil, nil, nil) exec := NewBuildExecutor(registry, nil, nil, nil)
task := &domain.WorkTask{ task := &domain.WorkTask{
ID: "task-1", ID: "task-1",
@ -291,7 +278,7 @@ func TestBuildExecutor_Execute(t *testing.T) {
} }
func TestBuildExecutor_ParseSpec(t *testing.T) { func TestBuildExecutor_ParseSpec(t *testing.T) {
exec := NewBuildExecutor(nil, nil, nil, nil, nil) exec := NewBuildExecutor(nil, nil, nil, nil)
t.Run("valid spec", func(t *testing.T) { t.Run("valid spec", func(t *testing.T) {
spec, err := exec.parseSpec(map[string]any{ spec, err := exec.parseSpec(map[string]any{

View File

@ -1,75 +0,0 @@
# Slack Preparation Phase 2 (Execution Plan)
> Based on `slack-preparation-thoughts.md`, this plan details the specific files and commands needed to enable `slackpath-1` (Identity) and `slackpath-2` (Async Workers).
## 1. Missing Templates (Priority: High)
We must implement these templates in `internal/adapter/templates/templates/` so the Tree Runner can execute `add-worker`, `add-db`, and `add-redis`.
### `worker` Template
* **Location:** `internal/adapter/templates/templates/worker`
* **Structure:** Go service similar to `go-api` but optimized for long-running tasks.
* **Key File:** `cmd/worker/main.go` (starts a queue consumer instead of an HTTP server).
* **Dockerfile:** Standard Go build.
* **Helm Chart:** Deployment (not Service/Ingress).
### `postgres` Template
* **Location:** `internal/adapter/templates/templates/postgres`
* **Nature:** Infrastructure Wrapper.
* **Helm Chart:** Dependencies on `bitnami/postgresql` or a custom StatefulSet.
* **Outputs:** Connection string secret.
### `redis` Template
* **Location:** `internal/adapter/templates/templates/redis`
* **Nature:** Infrastructure Wrapper.
* **Helm Chart:** Dependencies on `bitnami/redis`.
* **Outputs:** Host/Port secrets.
## 2. Missing Shared Packages (Priority: Critical)
The agent needs standard libraries to avoid writing insecure auth or buggy queues from scratch. These go in `pkg/` at the root of the project templates (or the `skeleton`).
### `pkg/auth` (for Path 1)
* **JWT Handling:** `Sign(claims) string`, `Parse(token) (*Claims, error)`.
* **Middleware:** `func RequireAuth(next http.Handler) http.Handler`.
* **Context:** `func GetUser(ctx) User`.
### `pkg/queue` (for Path 2)
* **Interface:**
```go
type JobQueue interface {
Enqueue(ctx context.Context, topic string, payload any) error
Process(ctx context.Context, topic string, handler func(payload []byte) error)
}
```
* **Implementation:** Redis-based (using `go-redis/v9`).
## 3. SDLC Commands (Priority: Critical)
The cookbooks use commands that don't exist. We must create the prompt definitions.
### `.claude/commands/spec-feature.md`
* **Goal:** Generate `spec.md`.
* **Instructions:** "You are a Technical PM. Analyze the feature slug. Write a requirements doc."
### `.claude/commands/design-feature.md`
* **Goal:** Generate `design.md`.
* **Instructions:** "You are a System Architect. Analyze `spec.md`. Define DB Schema, API Contracts, and Package structure."
### `.claude/commands/implement-feature.md`
* **Goal:** Write Code.
* **Instructions:** "You are a Senior Go Developer. Read `design.md`. Implement the changes. Run tests."
## 4. API Verification (Priority: Medium)
We need to ensure `rdev-api` actually handles the `POST /projects/{id}/builds` endpoint with the `prompt` payload correctly (routing it to the Claude Code agent).
* **Check:** `cmd/rdev-api/internal/service/build_service.go` (or similar).
* **Verify:** Does it invoke the `claude` CLI inside the project pod?
## Execution Order
1. **Templates:** Create `worker`, `postgres`, `redis` folders.
2. **Packages:** Write `pkg/auth` and `pkg/queue` code to be included in the `skeleton` template.
3. **Commands:** Create the 3 markdown files in `.claude/commands/`.
4. **Test:** Run `slackpath-1`.

View File

@ -1,67 +0,0 @@
# Slack Preparation Analysis
To successfully build a Slack-like distributed system using the `rdev` agentic workflow, we must bridge the gap between our current capabilities and the requirements of the `slackpath-*` cookbooks.
## 1. Success Enablers (What we have)
* **`pkg/api` Chassis**: A solid Go service foundation (`App` struct) that handles routing, logging (`slog`), middleware, and graceful shutdown. This allows us to spin up consistent APIs quickly.
* **`go-api` Template**: A standardized backend service template in `internal/adapter/templates/templates/go-api` that likely uses the chassis.
* **`skeleton` Template**: A monorepo base to hold multiple services.
* **Tree Runner**: A robust orchestration engine (`tree-runner.sh`) that can execute complex workflows, manage state, and verify deployments.
* **Infrastructure**: We seem to have K8s/Woodpecker integration via `deploy-k8s` skill.
## 2. Functionality Gaps (What is missing)
### Critical Infrastructure
* **Missing Component Types**: The cookbooks assume `type: worker`, `type: postgres`, and `type: redis`. Currently, `internal/adapter/templates/templates` only shows `go-api`, `astro-landing`, and `skeleton`.
* *Risk:* `add-db` and `add-worker` steps will fail in the runner.
* **Missing Shared Packages**:
* `pkg/auth`: No standard JWT verification middleware. Agent will have to write security code from scratch (high risk).
* `pkg/redis` or `pkg/queue`: No standard wrapper for Redis interactions or job queues.
* `pkg/websocket`: No standard WS upgrader/handler.
### SDLC Automation
* **Missing SDLC Commands**: The cookbooks rely on `/spec-feature`, `/design-feature`, and `/implement-feature`. These do **not** exist in `.claude/commands/`.
* **Missing API Logic**: The `rdev-api` endpoints for SDLC (`/sdlc/features`, `/builds`) need to be verified to ensure they support the "Prompt-to-Code" flow described in the cookbooks.
## 3. Required Claude Configuration
We need to create/update these to enable the agentic workflow:
* **`.claude/commands/implement-feature.md`**: Instructions for the agent on how to take a requirement and produce code.
* **`.claude/commands/spec-feature.md`**: Instructions for generating a technical spec artifact.
* **`.claude/commands/design-feature.md`**: Instructions for generating a DB schema/API design artifact.
* **`.claude/skills/distributed-systems.md`**: A skill that teaches the agent about our specific patterns for Worker/API communication (e.g., "Always use `pkg/queue` for async tasks").
## 4. Required SDLC Commands (in `rdev-api`)
The `rdev-api` needs to handle these operations (referenced in cookbooks):
* `POST /projects/{id}/sdlc/features`: Register a feature.
* `POST /projects/{id}/builds`: Trigger an autonomous build task (Prompt -> Code).
* `GET /projects/{id}/sdlc/next`: The "Classifier" logic to tell the runner what to do next.
## 5. Core Packages & Patterns to Implement
Before running `slackpath-1`:
1. **`pkg/auth`**:
* `GenerateToken(user User) (string, error)`
* `Middleware(secret string) func(http.Handler) http.Handler`
* `UserFromContext(ctx) User`
2. **`pkg/queue`** (for Path 2):
* `Producer` interface (`Enqueue(job)`)
* `Consumer` interface (`RegisterHandler(type, func)`)
* Redis implementation.
3. **`pkg/realtime`** (for Path 3):
* Websocket Hub implementation (register, unregister, broadcast).
* Redis Pub/Sub adapter for scaling across pods.
## Action Plan
1. **Scaffold Missing Templates**: Create `worker`, `redis` (helm chart wrapper), and `postgres` (helm chart wrapper) in `internal/adapter/templates/templates`.
2. **Build Shared Libs**: Implement `pkg/auth` and `pkg/queue` to standardize the "hard parts".
3. **Install SDLC Commands**: Create the missing markdown files in `.claude/commands`.
4. **Verify API**: Ensure `rdev-api` has the SDLC endpoints mounted.