package memory import ( "sync" "github.com/orchard9/rdev/internal/port" ) // StreamPublisher is an in-memory implementation of port.StreamPublisher. type StreamPublisher struct { mu sync.RWMutex streams map[string][]chan port.StreamEvent } // NewStreamPublisher creates a new in-memory stream publisher. func NewStreamPublisher() *StreamPublisher { return &StreamPublisher{ streams: make(map[string][]chan port.StreamEvent), } } // Ensure StreamPublisher implements port.StreamPublisher at compile time. var _ port.StreamPublisher = (*StreamPublisher)(nil) // Subscribe creates a subscription to events for the given stream ID. func (sp *StreamPublisher) Subscribe(streamID string) (<-chan port.StreamEvent, func()) { sp.mu.Lock() defer sp.mu.Unlock() ch := make(chan port.StreamEvent, 100) sp.streams[streamID] = append(sp.streams[streamID], ch) // Return cleanup function cleanup := func() { sp.unsubscribe(streamID, ch) } return ch, cleanup } func (sp *StreamPublisher) unsubscribe(streamID string, ch chan port.StreamEvent) { sp.mu.Lock() defer sp.mu.Unlock() channels := sp.streams[streamID] for i, c := range channels { if c == ch { sp.streams[streamID] = append(channels[:i], channels[i+1:]...) close(ch) break } } } // Publish sends an event to all subscribers of a stream. func (sp *StreamPublisher) Publish(streamID string, event port.StreamEvent) { sp.mu.RLock() defer sp.mu.RUnlock() for _, ch := range sp.streams[streamID] { select { case ch <- event: default: // Channel full, skip } } } // Close closes a stream and all its subscriptions. func (sp *StreamPublisher) Close(streamID string) { sp.mu.Lock() defer sp.mu.Unlock() for _, ch := range sp.streams[streamID] { close(ch) } delete(sp.streams, streamID) } // SubscriberCount returns the number of subscribers for a stream (for testing). func (sp *StreamPublisher) SubscriberCount(streamID string) int { sp.mu.RLock() defer sp.mu.RUnlock() return len(sp.streams[streamID]) }