package memory import ( "fmt" "sync" "sync/atomic" "github.com/orchard9/rdev/internal/port" ) // StreamPublisher is an in-memory implementation of port.StreamPublisher // with event ID generation and replay buffer support. type StreamPublisher struct { mu sync.RWMutex streams map[string]*streamState } // subscriber wraps a channel with closed state to prevent send-on-closed-channel races. type subscriber struct { ch chan port.StreamEvent closed atomic.Bool mu sync.Mutex // protects close and send operations } // trySend attempts to send an event to the subscriber's channel. // Returns false if the subscriber is closed or channel is full. // This is safe to call concurrently with close operations. func (s *subscriber) trySend(event port.StreamEvent) bool { s.mu.Lock() defer s.mu.Unlock() if s.closed.Load() { return false } select { case s.ch <- event: return true default: return false // Channel full } } // doClose closes the subscriber channel safely. // This is safe to call concurrently with send operations. func (s *subscriber) doClose() { s.mu.Lock() defer s.mu.Unlock() if !s.closed.Swap(true) { // Only close if we're the one who set closed to true close(s.ch) } } // streamState holds the state for a single stream. type streamState struct { subscribers []*subscriber eventBuffer []port.StreamEvent // Circular buffer for replay eventSeq atomic.Uint64 // Monotonic event sequence bufferSize int // Max events to keep for replay } // NewStreamPublisher creates a new in-memory stream publisher. func NewStreamPublisher() *StreamPublisher { return &StreamPublisher{ streams: make(map[string]*streamState), } } // Ensure StreamPublisher implements port.StreamPublisher at compile time. var _ port.StreamPublisher = (*StreamPublisher)(nil) // getOrCreateStream returns the stream state, creating it if necessary. func (sp *StreamPublisher) getOrCreateStream(streamID string) *streamState { sp.mu.Lock() defer sp.mu.Unlock() state, exists := sp.streams[streamID] if !exists { state = &streamState{ eventBuffer: make([]port.StreamEvent, 0, 100), bufferSize: 100, // Keep last 100 events for replay } sp.streams[streamID] = state } return state } // Subscribe creates a subscription to events for the given stream ID. func (sp *StreamPublisher) Subscribe(streamID string) (<-chan port.StreamEvent, func()) { return sp.SubscribeFromID(streamID, "") } // SubscribeFromID creates a subscription starting from a specific event ID. // Events since lastEventID will be replayed before new events are delivered. func (sp *StreamPublisher) SubscribeFromID(streamID string, lastEventID string) (<-chan port.StreamEvent, func()) { state := sp.getOrCreateStream(streamID) sp.mu.Lock() defer sp.mu.Unlock() sub := &subscriber{ ch: make(chan port.StreamEvent, 100), } state.subscribers = append(state.subscribers, sub) // Replay events if lastEventID is provided if lastEventID != "" { go sp.replayEvents(sub, state, lastEventID) } // Return cleanup function cleanup := func() { sp.unsubscribe(streamID, sub) } return sub.ch, cleanup } // replayEvents sends buffered events that occurred after lastEventID. func (sp *StreamPublisher) replayEvents(sub *subscriber, state *streamState, lastEventID string) { sp.mu.RLock() defer sp.mu.RUnlock() found := false for _, event := range state.eventBuffer { if found { if !sub.trySend(event) && sub.closed.Load() { return // Subscriber closed, stop replay } } if event.ID == lastEventID { found = true } } // If we didn't find the lastEventID (too old), replay all buffered events if !found && lastEventID != "" { for _, event := range state.eventBuffer { if !sub.trySend(event) && sub.closed.Load() { return // Subscriber closed, stop replay } } } } func (sp *StreamPublisher) unsubscribe(streamID string, sub *subscriber) { // Close the subscriber channel safely (handles concurrent sends) sub.doClose() sp.mu.Lock() defer sp.mu.Unlock() state, exists := sp.streams[streamID] if !exists { return } for i, s := range state.subscribers { if s == sub { state.subscribers = append(state.subscribers[:i], state.subscribers[i+1:]...) break } } } // Publish sends an event to all subscribers of a stream. // Returns the generated event ID. func (sp *StreamPublisher) Publish(streamID string, event port.StreamEvent) string { state := sp.getOrCreateStream(streamID) // Generate event ID seq := state.eventSeq.Add(1) event.ID = fmt.Sprintf("%s:%d", streamID, seq) sp.mu.Lock() // Add to buffer for replay if len(state.eventBuffer) >= state.bufferSize { // Remove oldest event state.eventBuffer = state.eventBuffer[1:] } state.eventBuffer = append(state.eventBuffer, event) // Copy subscriber pointers (safe - trySend handles concurrent close) subscribers := make([]*subscriber, len(state.subscribers)) copy(subscribers, state.subscribers) sp.mu.Unlock() // Send to all subscribers using thread-safe trySend for _, sub := range subscribers { sub.trySend(event) } return event.ID } // Close closes a stream and all its subscriptions. func (sp *StreamPublisher) Close(streamID string) { sp.mu.Lock() defer sp.mu.Unlock() state, exists := sp.streams[streamID] if !exists { return } for _, sub := range state.subscribers { sub.doClose() } delete(sp.streams, streamID) } // SubscriberCount returns the number of subscribers for a stream (for testing). func (sp *StreamPublisher) SubscriberCount(streamID string) int { sp.mu.RLock() defer sp.mu.RUnlock() state, exists := sp.streams[streamID] if !exists { return 0 } return len(state.subscribers) } // BufferedEventCount returns the number of buffered events for a stream (for testing). func (sp *StreamPublisher) BufferedEventCount(streamID string) int { sp.mu.RLock() defer sp.mu.RUnlock() state, exists := sp.streams[streamID] if !exists { return 0 } return len(state.eventBuffer) }