Major refactoring to hexagonal (ports & adapters) architecture: - Add service layer (apikey_service, project_service) for business logic - Add webhook system with dispatcher and delivery tracking - Add command queue with priority-based processing - Add rate limiting with sliding window algorithm - Add audit logging for command execution - Add OpenTelemetry integration (traces, metrics, spans) - Add circuit breaker for fault tolerance - Add cached repository wrapper for performance - Add comprehensive validation package - Add Kubernetes client integration for pod management - Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks) - Add network policy and PodDisruptionBudget for k8s - Remove legacy executor and projects/registry packages - Untrack secrets.yaml (now managed via envault) - Add coverage.out to .gitignore - Add e2e test infrastructure with docker-compose - Add comprehensive documentation (API, architecture, operations, plans) - Add golangci-lint config and pre-commit hook Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
214 lines
5.0 KiB
Go
214 lines
5.0 KiB
Go
// Package cached provides caching wrappers for repositories.
|
|
package cached
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// ProjectRepository wraps another ProjectRepository with caching.
|
|
// The cache TTL determines how long the project list is cached before
|
|
// a refresh is needed. Individual project lookups are also cached.
|
|
type ProjectRepository struct {
|
|
inner port.ProjectRepository
|
|
ttl time.Duration
|
|
|
|
mu sync.RWMutex
|
|
projectsCache []domain.Project
|
|
projectMap map[domain.ProjectID]*domain.Project
|
|
lastFetch time.Time
|
|
}
|
|
|
|
// NewProjectRepository creates a caching wrapper around a ProjectRepository.
|
|
func NewProjectRepository(inner port.ProjectRepository, ttl time.Duration) *ProjectRepository {
|
|
if ttl <= 0 {
|
|
ttl = 30 * time.Second // Default cache TTL
|
|
}
|
|
return &ProjectRepository{
|
|
inner: inner,
|
|
ttl: ttl,
|
|
projectMap: make(map[domain.ProjectID]*domain.Project),
|
|
}
|
|
}
|
|
|
|
// List returns all projects, using cache if fresh.
|
|
func (r *ProjectRepository) List(ctx context.Context) ([]domain.Project, error) {
|
|
// Check cache first
|
|
r.mu.RLock()
|
|
if r.isCacheFresh() {
|
|
projects := make([]domain.Project, len(r.projectsCache))
|
|
copy(projects, r.projectsCache)
|
|
r.mu.RUnlock()
|
|
return projects, nil
|
|
}
|
|
r.mu.RUnlock()
|
|
|
|
// Cache miss - acquire write lock and refresh
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
// Double-check after acquiring write lock
|
|
if r.isCacheFresh() {
|
|
projects := make([]domain.Project, len(r.projectsCache))
|
|
copy(projects, r.projectsCache)
|
|
return projects, nil
|
|
}
|
|
|
|
// Fetch from inner repository
|
|
projects, err := r.inner.List(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Update cache
|
|
r.projectsCache = projects
|
|
r.projectMap = make(map[domain.ProjectID]*domain.Project, len(projects))
|
|
for i := range projects {
|
|
r.projectMap[projects[i].ID] = &projects[i]
|
|
}
|
|
r.lastFetch = time.Now()
|
|
|
|
// Return a copy to prevent mutation
|
|
result := make([]domain.Project, len(projects))
|
|
copy(result, projects)
|
|
return result, nil
|
|
}
|
|
|
|
// Get returns a single project by ID, using cache if available.
|
|
func (r *ProjectRepository) Get(ctx context.Context, id domain.ProjectID) (*domain.Project, error) {
|
|
// Check cache first
|
|
r.mu.RLock()
|
|
if r.isCacheFresh() {
|
|
if p, ok := r.projectMap[id]; ok {
|
|
// Return a copy
|
|
copied := *p
|
|
r.mu.RUnlock()
|
|
return &copied, nil
|
|
}
|
|
r.mu.RUnlock()
|
|
return nil, domain.ErrProjectNotFound
|
|
}
|
|
r.mu.RUnlock()
|
|
|
|
// Cache stale - refresh and try again
|
|
_, err := r.List(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
if p, ok := r.projectMap[id]; ok {
|
|
copied := *p
|
|
return &copied, nil
|
|
}
|
|
return nil, domain.ErrProjectNotFound
|
|
}
|
|
|
|
// Exists checks if a project exists by ID.
|
|
func (r *ProjectRepository) Exists(ctx context.Context, id domain.ProjectID) (bool, error) {
|
|
r.mu.RLock()
|
|
if r.isCacheFresh() {
|
|
_, exists := r.projectMap[id]
|
|
r.mu.RUnlock()
|
|
return exists, nil
|
|
}
|
|
r.mu.RUnlock()
|
|
|
|
// Cache stale - refresh
|
|
_, err := r.List(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
_, exists := r.projectMap[id]
|
|
return exists, nil
|
|
}
|
|
|
|
// RefreshStatus refreshes project status from the underlying repository.
|
|
// This bypasses the cache and forces a refresh.
|
|
func (r *ProjectRepository) RefreshStatus(ctx context.Context) error {
|
|
err := r.inner.RefreshStatus(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Invalidate cache so next List() fetches fresh data
|
|
r.mu.Lock()
|
|
r.lastFetch = time.Time{} // Zero time = stale
|
|
r.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Register is a pass-through that invalidates cache after registration.
|
|
func (r *ProjectRepository) Register(ctx context.Context, p *domain.Project) error {
|
|
err := r.inner.Register(ctx, p)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r.mu.Lock()
|
|
r.lastFetch = time.Time{} // Invalidate cache
|
|
r.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Unregister is a pass-through that invalidates cache after unregistration.
|
|
func (r *ProjectRepository) Unregister(ctx context.Context, id domain.ProjectID) error {
|
|
err := r.inner.Unregister(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r.mu.Lock()
|
|
r.lastFetch = time.Time{} // Invalidate cache
|
|
r.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// isCacheFresh checks if the cache is still within TTL.
|
|
// Must be called with at least a read lock held.
|
|
func (r *ProjectRepository) isCacheFresh() bool {
|
|
if r.lastFetch.IsZero() {
|
|
return false
|
|
}
|
|
return time.Since(r.lastFetch) < r.ttl
|
|
}
|
|
|
|
// Invalidate forces a cache refresh on next access.
|
|
func (r *ProjectRepository) Invalidate() {
|
|
r.mu.Lock()
|
|
r.lastFetch = time.Time{}
|
|
r.mu.Unlock()
|
|
}
|
|
|
|
// CacheStats returns statistics about the cache.
|
|
func (r *ProjectRepository) CacheStats() CacheStats {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
return CacheStats{
|
|
Size: len(r.projectsCache),
|
|
LastFetch: r.lastFetch,
|
|
IsFresh: r.isCacheFresh(),
|
|
TTL: r.ttl,
|
|
}
|
|
}
|
|
|
|
// CacheStats contains cache statistics.
|
|
type CacheStats struct {
|
|
Size int
|
|
LastFetch time.Time
|
|
IsFresh bool
|
|
TTL time.Duration
|
|
}
|