Major refactoring to hexagonal (ports & adapters) architecture: - Add service layer (apikey_service, project_service) for business logic - Add webhook system with dispatcher and delivery tracking - Add command queue with priority-based processing - Add rate limiting with sliding window algorithm - Add audit logging for command execution - Add OpenTelemetry integration (traces, metrics, spans) - Add circuit breaker for fault tolerance - Add cached repository wrapper for performance - Add comprehensive validation package - Add Kubernetes client integration for pod management - Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks) - Add network policy and PodDisruptionBudget for k8s - Remove legacy executor and projects/registry packages - Untrack secrets.yaml (now managed via envault) - Add coverage.out to .gitignore - Add e2e test infrastructure with docker-compose - Add comprehensive documentation (API, architecture, operations, plans) - Add golangci-lint config and pre-commit hook Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
236 lines
5.9 KiB
Go
236 lines
5.9 KiB
Go
package memory
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// StreamPublisher is an in-memory implementation of port.StreamPublisher
|
|
// with event ID generation and replay buffer support.
|
|
type StreamPublisher struct {
|
|
mu sync.RWMutex
|
|
streams map[string]*streamState
|
|
}
|
|
|
|
// subscriber wraps a channel with closed state to prevent send-on-closed-channel races.
|
|
type subscriber struct {
|
|
ch chan port.StreamEvent
|
|
closed atomic.Bool
|
|
mu sync.Mutex // protects close and send operations
|
|
}
|
|
|
|
// trySend attempts to send an event to the subscriber's channel.
|
|
// Returns false if the subscriber is closed or channel is full.
|
|
// This is safe to call concurrently with close operations.
|
|
func (s *subscriber) trySend(event port.StreamEvent) bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.closed.Load() {
|
|
return false
|
|
}
|
|
|
|
select {
|
|
case s.ch <- event:
|
|
return true
|
|
default:
|
|
return false // Channel full
|
|
}
|
|
}
|
|
|
|
// doClose closes the subscriber channel safely.
|
|
// This is safe to call concurrently with send operations.
|
|
func (s *subscriber) doClose() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if !s.closed.Swap(true) {
|
|
// Only close if we're the one who set closed to true
|
|
close(s.ch)
|
|
}
|
|
}
|
|
|
|
// streamState holds the state for a single stream.
|
|
type streamState struct {
|
|
subscribers []*subscriber
|
|
eventBuffer []port.StreamEvent // Circular buffer for replay
|
|
eventSeq atomic.Uint64 // Monotonic event sequence
|
|
bufferSize int // Max events to keep for replay
|
|
}
|
|
|
|
// NewStreamPublisher creates a new in-memory stream publisher.
|
|
func NewStreamPublisher() *StreamPublisher {
|
|
return &StreamPublisher{
|
|
streams: make(map[string]*streamState),
|
|
}
|
|
}
|
|
|
|
// Ensure StreamPublisher implements port.StreamPublisher at compile time.
|
|
var _ port.StreamPublisher = (*StreamPublisher)(nil)
|
|
|
|
// getOrCreateStream returns the stream state, creating it if necessary.
|
|
func (sp *StreamPublisher) getOrCreateStream(streamID string) *streamState {
|
|
sp.mu.Lock()
|
|
defer sp.mu.Unlock()
|
|
|
|
state, exists := sp.streams[streamID]
|
|
if !exists {
|
|
state = &streamState{
|
|
eventBuffer: make([]port.StreamEvent, 0, 100),
|
|
bufferSize: 100, // Keep last 100 events for replay
|
|
}
|
|
sp.streams[streamID] = state
|
|
}
|
|
return state
|
|
}
|
|
|
|
// Subscribe creates a subscription to events for the given stream ID.
|
|
func (sp *StreamPublisher) Subscribe(streamID string) (<-chan port.StreamEvent, func()) {
|
|
return sp.SubscribeFromID(streamID, "")
|
|
}
|
|
|
|
// SubscribeFromID creates a subscription starting from a specific event ID.
|
|
// Events since lastEventID will be replayed before new events are delivered.
|
|
func (sp *StreamPublisher) SubscribeFromID(streamID string, lastEventID string) (<-chan port.StreamEvent, func()) {
|
|
state := sp.getOrCreateStream(streamID)
|
|
|
|
sp.mu.Lock()
|
|
defer sp.mu.Unlock()
|
|
|
|
sub := &subscriber{
|
|
ch: make(chan port.StreamEvent, 100),
|
|
}
|
|
state.subscribers = append(state.subscribers, sub)
|
|
|
|
// Replay events if lastEventID is provided
|
|
if lastEventID != "" {
|
|
go sp.replayEvents(sub, state, lastEventID)
|
|
}
|
|
|
|
// Return cleanup function
|
|
cleanup := func() {
|
|
sp.unsubscribe(streamID, sub)
|
|
}
|
|
|
|
return sub.ch, cleanup
|
|
}
|
|
|
|
// replayEvents sends buffered events that occurred after lastEventID.
|
|
func (sp *StreamPublisher) replayEvents(sub *subscriber, state *streamState, lastEventID string) {
|
|
sp.mu.RLock()
|
|
defer sp.mu.RUnlock()
|
|
|
|
found := false
|
|
for _, event := range state.eventBuffer {
|
|
if found {
|
|
if !sub.trySend(event) && sub.closed.Load() {
|
|
return // Subscriber closed, stop replay
|
|
}
|
|
}
|
|
if event.ID == lastEventID {
|
|
found = true
|
|
}
|
|
}
|
|
|
|
// If we didn't find the lastEventID (too old), replay all buffered events
|
|
if !found && lastEventID != "" {
|
|
for _, event := range state.eventBuffer {
|
|
if !sub.trySend(event) && sub.closed.Load() {
|
|
return // Subscriber closed, stop replay
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sp *StreamPublisher) unsubscribe(streamID string, sub *subscriber) {
|
|
// Close the subscriber channel safely (handles concurrent sends)
|
|
sub.doClose()
|
|
|
|
sp.mu.Lock()
|
|
defer sp.mu.Unlock()
|
|
|
|
state, exists := sp.streams[streamID]
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
for i, s := range state.subscribers {
|
|
if s == sub {
|
|
state.subscribers = append(state.subscribers[:i], state.subscribers[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Publish sends an event to all subscribers of a stream.
|
|
// Returns the generated event ID.
|
|
func (sp *StreamPublisher) Publish(streamID string, event port.StreamEvent) string {
|
|
state := sp.getOrCreateStream(streamID)
|
|
|
|
// Generate event ID
|
|
seq := state.eventSeq.Add(1)
|
|
event.ID = fmt.Sprintf("%s:%d", streamID, seq)
|
|
|
|
sp.mu.Lock()
|
|
// Add to buffer for replay
|
|
if len(state.eventBuffer) >= state.bufferSize {
|
|
// Remove oldest event
|
|
state.eventBuffer = state.eventBuffer[1:]
|
|
}
|
|
state.eventBuffer = append(state.eventBuffer, event)
|
|
// Copy subscriber pointers (safe - trySend handles concurrent close)
|
|
subscribers := make([]*subscriber, len(state.subscribers))
|
|
copy(subscribers, state.subscribers)
|
|
sp.mu.Unlock()
|
|
|
|
// Send to all subscribers using thread-safe trySend
|
|
for _, sub := range subscribers {
|
|
sub.trySend(event)
|
|
}
|
|
|
|
return event.ID
|
|
}
|
|
|
|
// Close closes a stream and all its subscriptions.
|
|
func (sp *StreamPublisher) Close(streamID string) {
|
|
sp.mu.Lock()
|
|
defer sp.mu.Unlock()
|
|
|
|
state, exists := sp.streams[streamID]
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
for _, sub := range state.subscribers {
|
|
sub.doClose()
|
|
}
|
|
delete(sp.streams, streamID)
|
|
}
|
|
|
|
// SubscriberCount returns the number of subscribers for a stream (for testing).
|
|
func (sp *StreamPublisher) SubscriberCount(streamID string) int {
|
|
sp.mu.RLock()
|
|
defer sp.mu.RUnlock()
|
|
|
|
state, exists := sp.streams[streamID]
|
|
if !exists {
|
|
return 0
|
|
}
|
|
return len(state.subscribers)
|
|
}
|
|
|
|
// BufferedEventCount returns the number of buffered events for a stream (for testing).
|
|
func (sp *StreamPublisher) BufferedEventCount(streamID string) int {
|
|
sp.mu.RLock()
|
|
defer sp.mu.RUnlock()
|
|
|
|
state, exists := sp.streams[streamID]
|
|
if !exists {
|
|
return 0
|
|
}
|
|
return len(state.eventBuffer)
|
|
}
|