124 lines
3.1 KiB
Go
124 lines
3.1 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime/debug"
|
|
"time"
|
|
|
|
"git.threesix.ai/jordan/slack5-1770529463/pkg/logging"
|
|
)
|
|
|
|
// LoggingMiddleware logs job processing events.
|
|
// Logs job start, completion/failure, and duration.
|
|
func LoggingMiddleware(logger *logging.Logger) func(Handler) Handler {
|
|
return func(next Handler) Handler {
|
|
return func(ctx context.Context, job *Job) error {
|
|
start := time.Now()
|
|
logger.Info("job started",
|
|
"job_id", job.ID,
|
|
"type", job.Type,
|
|
"retry_count", job.RetryCount,
|
|
)
|
|
|
|
err := next(ctx, job)
|
|
|
|
duration := time.Since(start)
|
|
if err != nil {
|
|
logger.Error("job failed",
|
|
"job_id", job.ID,
|
|
"type", job.Type,
|
|
"duration", duration,
|
|
"error", err,
|
|
)
|
|
} else {
|
|
logger.Info("job completed",
|
|
"job_id", job.ID,
|
|
"type", job.Type,
|
|
"duration", duration,
|
|
)
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// RecoveryMiddleware catches panics and converts them to errors.
|
|
// Prevents a single panicking job from crashing the worker.
|
|
func RecoveryMiddleware(logger *logging.Logger) func(Handler) Handler {
|
|
return func(next Handler) Handler {
|
|
return func(ctx context.Context, job *Job) (err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
stack := debug.Stack()
|
|
logger.Error("job panic recovered",
|
|
"job_id", job.ID,
|
|
"type", job.Type,
|
|
"panic", r,
|
|
"stack", string(stack),
|
|
)
|
|
err = fmt.Errorf("panic in job handler: %v", r)
|
|
}
|
|
}()
|
|
return next(ctx, job)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TimeoutMiddleware enforces a maximum job processing time.
|
|
// Jobs that exceed the timeout return a context deadline exceeded error.
|
|
func TimeoutMiddleware(timeout time.Duration) func(Handler) Handler {
|
|
return func(next Handler) Handler {
|
|
return func(ctx context.Context, job *Job) error {
|
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
return next(ctx, job)
|
|
}
|
|
}
|
|
}
|
|
|
|
// MetricsMiddleware tracks job processing metrics.
|
|
// Calls the provided callbacks for observability integration.
|
|
type MetricsCallbacks struct {
|
|
OnJobStarted func(jobType string)
|
|
OnJobCompleted func(jobType string, duration time.Duration)
|
|
OnJobFailed func(jobType string, duration time.Duration, err error)
|
|
}
|
|
|
|
// MetricsMiddleware creates middleware that tracks job processing metrics.
|
|
func MetricsMiddleware(callbacks MetricsCallbacks) func(Handler) Handler {
|
|
return func(next Handler) Handler {
|
|
return func(ctx context.Context, job *Job) error {
|
|
if callbacks.OnJobStarted != nil {
|
|
callbacks.OnJobStarted(job.Type)
|
|
}
|
|
|
|
start := time.Now()
|
|
err := next(ctx, job)
|
|
duration := time.Since(start)
|
|
|
|
if err != nil {
|
|
if callbacks.OnJobFailed != nil {
|
|
callbacks.OnJobFailed(job.Type, duration, err)
|
|
}
|
|
} else {
|
|
if callbacks.OnJobCompleted != nil {
|
|
callbacks.OnJobCompleted(job.Type, duration)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Chain combines multiple middleware into a single middleware.
|
|
// Middleware is applied in order: Chain(a, b, c)(h) = a(b(c(h)))
|
|
func Chain(middlewares ...func(Handler) Handler) func(Handler) Handler {
|
|
return func(h Handler) Handler {
|
|
for i := len(middlewares) - 1; i >= 0; i-- {
|
|
h = middlewares[i](h)
|
|
}
|
|
return h
|
|
}
|
|
}
|