slack5-1770523903/pkg/queue/middleware.go
jordan 7c67584b50
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/manual/woodpecker Pipeline was successful
Initialize project from skeleton template
2026-02-08 04:11:44 +00:00

124 lines
3.1 KiB
Go

package queue
import (
"context"
"fmt"
"runtime/debug"
"time"
"git.threesix.ai/jordan/slack5-1770523903/pkg/logging"
)
// LoggingMiddleware logs job processing events.
// Logs job start, completion/failure, and duration.
func LoggingMiddleware(logger *logging.Logger) func(Handler) Handler {
return func(next Handler) Handler {
return func(ctx context.Context, job *Job) error {
start := time.Now()
logger.Info("job started",
"job_id", job.ID,
"type", job.Type,
"retry_count", job.RetryCount,
)
err := next(ctx, job)
duration := time.Since(start)
if err != nil {
logger.Error("job failed",
"job_id", job.ID,
"type", job.Type,
"duration", duration,
"error", err,
)
} else {
logger.Info("job completed",
"job_id", job.ID,
"type", job.Type,
"duration", duration,
)
}
return err
}
}
}
// RecoveryMiddleware catches panics and converts them to errors.
// Prevents a single panicking job from crashing the worker.
func RecoveryMiddleware(logger *logging.Logger) func(Handler) Handler {
return func(next Handler) Handler {
return func(ctx context.Context, job *Job) (err error) {
defer func() {
if r := recover(); r != nil {
stack := debug.Stack()
logger.Error("job panic recovered",
"job_id", job.ID,
"type", job.Type,
"panic", r,
"stack", string(stack),
)
err = fmt.Errorf("panic in job handler: %v", r)
}
}()
return next(ctx, job)
}
}
}
// TimeoutMiddleware enforces a maximum job processing time.
// Jobs that exceed the timeout return a context deadline exceeded error.
func TimeoutMiddleware(timeout time.Duration) func(Handler) Handler {
return func(next Handler) Handler {
return func(ctx context.Context, job *Job) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return next(ctx, job)
}
}
}
// MetricsMiddleware tracks job processing metrics.
// Calls the provided callbacks for observability integration.
type MetricsCallbacks struct {
OnJobStarted func(jobType string)
OnJobCompleted func(jobType string, duration time.Duration)
OnJobFailed func(jobType string, duration time.Duration, err error)
}
// MetricsMiddleware creates middleware that tracks job processing metrics.
func MetricsMiddleware(callbacks MetricsCallbacks) func(Handler) Handler {
return func(next Handler) Handler {
return func(ctx context.Context, job *Job) error {
if callbacks.OnJobStarted != nil {
callbacks.OnJobStarted(job.Type)
}
start := time.Now()
err := next(ctx, job)
duration := time.Since(start)
if err != nil {
if callbacks.OnJobFailed != nil {
callbacks.OnJobFailed(job.Type, duration, err)
}
} else {
if callbacks.OnJobCompleted != nil {
callbacks.OnJobCompleted(job.Type, duration)
}
}
return err
}
}
}
// Chain combines multiple middleware into a single middleware.
// Middleware is applied in order: Chain(a, b, c)(h) = a(b(c(h)))
func Chain(middlewares ...func(Handler) Handler) func(Handler) Handler {
return func(h Handler) Handler {
for i := len(middlewares) - 1; i >= 0; i-- {
h = middlewares[i](h)
}
return h
}
}