124 lines
3.2 KiB
Go
124 lines
3.2 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// MemoryQueue is an in-memory job queue that dispatches jobs to registered handlers
|
|
// in goroutines. Use this for local development when no database is available.
|
|
// Implements Producer and JobReader so the service handlers can enqueue and query jobs.
|
|
type MemoryQueue struct {
|
|
handlers map[string]Handler
|
|
jobs map[string]*Job
|
|
mu sync.RWMutex
|
|
logger *slog.Logger
|
|
}
|
|
|
|
// NewMemoryQueue creates an in-memory queue for standalone/development mode.
|
|
func NewMemoryQueue(logger *slog.Logger) *MemoryQueue {
|
|
if logger == nil {
|
|
logger = slog.Default()
|
|
}
|
|
return &MemoryQueue{
|
|
handlers: make(map[string]Handler),
|
|
jobs: make(map[string]*Job),
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// RegisterHandler registers a handler for a specific job type.
|
|
// When Enqueue is called with this job type, the handler runs in a goroutine.
|
|
func (q *MemoryQueue) RegisterHandler(jobType string, handler Handler) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
q.handlers[jobType] = handler
|
|
q.logger.Info("registered in-memory job handler", "job_type", jobType)
|
|
}
|
|
|
|
// Enqueue creates a job and immediately dispatches it to the registered handler in a goroutine.
|
|
func (q *MemoryQueue) Enqueue(ctx context.Context, jobType string, payload map[string]any) (string, error) {
|
|
return q.EnqueueWithOptions(ctx, Job{
|
|
Type: jobType,
|
|
Payload: payload,
|
|
})
|
|
}
|
|
|
|
// EnqueueWithOptions creates a job with custom options and dispatches it.
|
|
func (q *MemoryQueue) EnqueueWithOptions(_ context.Context, job Job) (string, error) {
|
|
if job.ID == "" {
|
|
job.ID = uuid.New().String()
|
|
}
|
|
if job.CreatedAt.IsZero() {
|
|
job.CreatedAt = time.Now().UTC()
|
|
}
|
|
job.Status = StatusPending
|
|
|
|
q.mu.RLock()
|
|
handler, ok := q.handlers[job.Type]
|
|
q.mu.RUnlock()
|
|
|
|
if !ok {
|
|
return "", fmt.Errorf("no handler registered for job type %q", job.Type)
|
|
}
|
|
|
|
// Store job for status tracking.
|
|
q.mu.Lock()
|
|
q.jobs[job.ID] = &job
|
|
q.mu.Unlock()
|
|
|
|
q.logger.Info("dispatching in-memory job", "job_id", job.ID, "job_type", job.Type)
|
|
|
|
// Process in background goroutine (mirrors worker behavior).
|
|
go func() {
|
|
q.mu.Lock()
|
|
job.Status = StatusRunning
|
|
now := time.Now().UTC()
|
|
job.StartedAt = &now
|
|
q.mu.Unlock()
|
|
|
|
if err := handler(context.Background(), &job); err != nil {
|
|
q.mu.Lock()
|
|
job.Status = StatusFailed
|
|
job.Error = err.Error()
|
|
completed := time.Now().UTC()
|
|
job.CompletedAt = &completed
|
|
q.mu.Unlock()
|
|
q.logger.Error("in-memory job failed", "job_id", job.ID, "job_type", job.Type, "error", err)
|
|
} else {
|
|
q.mu.Lock()
|
|
job.Status = StatusCompleted
|
|
completed := time.Now().UTC()
|
|
job.CompletedAt = &completed
|
|
q.mu.Unlock()
|
|
q.logger.Info("in-memory job completed", "job_id", job.ID, "job_type", job.Type)
|
|
}
|
|
}()
|
|
|
|
return job.ID, nil
|
|
}
|
|
|
|
// GetJob returns a job by ID. Returns ErrJobNotFound if the job doesn't exist.
|
|
func (q *MemoryQueue) GetJob(_ context.Context, jobID string) (*Job, error) {
|
|
q.mu.RLock()
|
|
defer q.mu.RUnlock()
|
|
|
|
job, ok := q.jobs[jobID]
|
|
if !ok {
|
|
return nil, ErrJobNotFound
|
|
}
|
|
cp := *job
|
|
return &cp, nil
|
|
}
|
|
|
|
// Compile-time checks.
|
|
var (
|
|
_ Producer = (*MemoryQueue)(nil)
|
|
_ JobReader = (*MemoryQueue)(nil)
|
|
)
|