rdev/internal/adapter/cached/project_repository.go
jordan 72d16929ca feat: Implement hexagonal architecture with services, webhooks, queue, and telemetry
Major refactoring to hexagonal (ports & adapters) architecture:

- Add service layer (apikey_service, project_service) for business logic
- Add webhook system with dispatcher and delivery tracking
- Add command queue with priority-based processing
- Add rate limiting with sliding window algorithm
- Add audit logging for command execution
- Add OpenTelemetry integration (traces, metrics, spans)
- Add circuit breaker for fault tolerance
- Add cached repository wrapper for performance
- Add comprehensive validation package
- Add Kubernetes client integration for pod management
- Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks)
- Add network policy and PodDisruptionBudget for k8s
- Remove legacy executor and projects/registry packages
- Untrack secrets.yaml (now managed via envault)
- Add coverage.out to .gitignore
- Add e2e test infrastructure with docker-compose
- Add comprehensive documentation (API, architecture, operations, plans)
- Add golangci-lint config and pre-commit hook

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 19:57:46 -07:00

214 lines
5.0 KiB
Go

// Package cached provides caching wrappers for repositories.
package cached
import (
"context"
"sync"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
)
// ProjectRepository wraps another ProjectRepository with caching.
// The cache TTL determines how long the project list is cached before
// a refresh is needed. Individual project lookups are also cached.
type ProjectRepository struct {
inner port.ProjectRepository
ttl time.Duration
mu sync.RWMutex
projectsCache []domain.Project
projectMap map[domain.ProjectID]*domain.Project
lastFetch time.Time
}
// NewProjectRepository creates a caching wrapper around a ProjectRepository.
func NewProjectRepository(inner port.ProjectRepository, ttl time.Duration) *ProjectRepository {
if ttl <= 0 {
ttl = 30 * time.Second // Default cache TTL
}
return &ProjectRepository{
inner: inner,
ttl: ttl,
projectMap: make(map[domain.ProjectID]*domain.Project),
}
}
// List returns all projects, using cache if fresh.
func (r *ProjectRepository) List(ctx context.Context) ([]domain.Project, error) {
// Check cache first
r.mu.RLock()
if r.isCacheFresh() {
projects := make([]domain.Project, len(r.projectsCache))
copy(projects, r.projectsCache)
r.mu.RUnlock()
return projects, nil
}
r.mu.RUnlock()
// Cache miss - acquire write lock and refresh
r.mu.Lock()
defer r.mu.Unlock()
// Double-check after acquiring write lock
if r.isCacheFresh() {
projects := make([]domain.Project, len(r.projectsCache))
copy(projects, r.projectsCache)
return projects, nil
}
// Fetch from inner repository
projects, err := r.inner.List(ctx)
if err != nil {
return nil, err
}
// Update cache
r.projectsCache = projects
r.projectMap = make(map[domain.ProjectID]*domain.Project, len(projects))
for i := range projects {
r.projectMap[projects[i].ID] = &projects[i]
}
r.lastFetch = time.Now()
// Return a copy to prevent mutation
result := make([]domain.Project, len(projects))
copy(result, projects)
return result, nil
}
// Get returns a single project by ID, using cache if available.
func (r *ProjectRepository) Get(ctx context.Context, id domain.ProjectID) (*domain.Project, error) {
// Check cache first
r.mu.RLock()
if r.isCacheFresh() {
if p, ok := r.projectMap[id]; ok {
// Return a copy
copied := *p
r.mu.RUnlock()
return &copied, nil
}
r.mu.RUnlock()
return nil, domain.ErrProjectNotFound
}
r.mu.RUnlock()
// Cache stale - refresh and try again
_, err := r.List(ctx)
if err != nil {
return nil, err
}
r.mu.RLock()
defer r.mu.RUnlock()
if p, ok := r.projectMap[id]; ok {
copied := *p
return &copied, nil
}
return nil, domain.ErrProjectNotFound
}
// Exists checks if a project exists by ID.
func (r *ProjectRepository) Exists(ctx context.Context, id domain.ProjectID) (bool, error) {
r.mu.RLock()
if r.isCacheFresh() {
_, exists := r.projectMap[id]
r.mu.RUnlock()
return exists, nil
}
r.mu.RUnlock()
// Cache stale - refresh
_, err := r.List(ctx)
if err != nil {
return false, err
}
r.mu.RLock()
defer r.mu.RUnlock()
_, exists := r.projectMap[id]
return exists, nil
}
// RefreshStatus refreshes project status from the underlying repository.
// This bypasses the cache and forces a refresh.
func (r *ProjectRepository) RefreshStatus(ctx context.Context) error {
err := r.inner.RefreshStatus(ctx)
if err != nil {
return err
}
// Invalidate cache so next List() fetches fresh data
r.mu.Lock()
r.lastFetch = time.Time{} // Zero time = stale
r.mu.Unlock()
return nil
}
// Register is a pass-through that invalidates cache after registration.
func (r *ProjectRepository) Register(ctx context.Context, p *domain.Project) error {
err := r.inner.Register(ctx, p)
if err != nil {
return err
}
r.mu.Lock()
r.lastFetch = time.Time{} // Invalidate cache
r.mu.Unlock()
return nil
}
// Unregister is a pass-through that invalidates cache after unregistration.
func (r *ProjectRepository) Unregister(ctx context.Context, id domain.ProjectID) error {
err := r.inner.Unregister(ctx, id)
if err != nil {
return err
}
r.mu.Lock()
r.lastFetch = time.Time{} // Invalidate cache
r.mu.Unlock()
return nil
}
// isCacheFresh checks if the cache is still within TTL.
// Must be called with at least a read lock held.
func (r *ProjectRepository) isCacheFresh() bool {
if r.lastFetch.IsZero() {
return false
}
return time.Since(r.lastFetch) < r.ttl
}
// Invalidate forces a cache refresh on next access.
func (r *ProjectRepository) Invalidate() {
r.mu.Lock()
r.lastFetch = time.Time{}
r.mu.Unlock()
}
// CacheStats returns statistics about the cache.
func (r *ProjectRepository) CacheStats() CacheStats {
r.mu.RLock()
defer r.mu.RUnlock()
return CacheStats{
Size: len(r.projectsCache),
LastFetch: r.lastFetch,
IsFresh: r.isCacheFresh(),
TTL: r.ttl,
}
}
// CacheStats contains cache statistics.
type CacheStats struct {
Size int
LastFetch time.Time
IsFresh bool
TTL time.Duration
}