rdev/internal/adapter/memory/stream_publisher.go
jordan c59d348040 chore: prepare for composable monorepo template implementation
This commit captures the current state before implementing the composable
monorepo template system. Key changes included:

Infrastructure:
- Add CockroachDB provisioner adapter for database provisioning
- Add Redis provisioner adapter for cache provisioning
- Add build events system with PostgreSQL storage
- Add WebSocket endpoint for real-time build progress

Code agent improvements:
- Fix Claude Code adapter to use default allowed tools instead of dangerously-skip-permissions
- Add context-aware stream closing for cancellation support
- Improve parser tests for edge cases

Build system:
- Add build event constants and metrics
- Remove deprecated git_operations.go (replaced by pod_git_operations.go)
- Add rollback logic for multi-step provisioning operations

Documentation:
- Add composable-monorepo feature documentation
- Add DNS/Cloudflare service documentation
- Update deployment and troubleshooting guides

Cookbooks:
- Add fullstack-app cookbook
- Refactor landing-test with shared library

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 11:39:28 -07:00

244 lines
6.1 KiB
Go

package memory
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/orchard9/rdev/internal/port"
)
// StreamPublisher is an in-memory implementation of port.StreamPublisher
// with event ID generation and replay buffer support.
type StreamPublisher struct {
mu sync.RWMutex
streams map[string]*streamState
}
// subscriber wraps a channel with closed state to prevent send-on-closed-channel races.
type subscriber struct {
ch chan port.StreamEvent
closed atomic.Bool
mu sync.Mutex // protects close and send operations
}
// trySend attempts to send an event to the subscriber's channel.
// Returns false if the subscriber is closed or channel is full.
// This is safe to call concurrently with close operations.
func (s *subscriber) trySend(event port.StreamEvent) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed.Load() {
return false
}
select {
case s.ch <- event:
return true
default:
return false // Channel full
}
}
// doClose closes the subscriber channel safely.
// This is safe to call concurrently with send operations.
func (s *subscriber) doClose() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.closed.Swap(true) {
// Only close if we're the one who set closed to true
close(s.ch)
}
}
// streamState holds the state for a single stream.
type streamState struct {
subscribers []*subscriber
eventBuffer []port.StreamEvent // Circular buffer for replay
eventSeq atomic.Uint64 // Monotonic event sequence
bufferSize int // Max events to keep for replay
}
// NewStreamPublisher creates a new in-memory stream publisher.
func NewStreamPublisher() *StreamPublisher {
return &StreamPublisher{
streams: make(map[string]*streamState),
}
}
// Ensure StreamPublisher implements port.StreamPublisher at compile time.
var _ port.StreamPublisher = (*StreamPublisher)(nil)
// getOrCreateStream returns the stream state, creating it if necessary.
func (sp *StreamPublisher) getOrCreateStream(streamID string) *streamState {
sp.mu.Lock()
defer sp.mu.Unlock()
state, exists := sp.streams[streamID]
if !exists {
state = &streamState{
eventBuffer: make([]port.StreamEvent, 0, 100),
bufferSize: 100, // Keep last 100 events for replay
}
sp.streams[streamID] = state
}
return state
}
// Subscribe creates a subscription to events for the given stream ID.
func (sp *StreamPublisher) Subscribe(streamID string) (<-chan port.StreamEvent, func()) {
return sp.SubscribeFromID(streamID, "")
}
// SubscribeFromID creates a subscription starting from a specific event ID.
// Events since lastEventID will be replayed before new events are delivered.
func (sp *StreamPublisher) SubscribeFromID(streamID string, lastEventID string) (<-chan port.StreamEvent, func()) {
state := sp.getOrCreateStream(streamID)
sp.mu.Lock()
defer sp.mu.Unlock()
sub := &subscriber{
ch: make(chan port.StreamEvent, 100),
}
state.subscribers = append(state.subscribers, sub)
// Replay events if lastEventID is provided
if lastEventID != "" {
go sp.replayEvents(sub, state, lastEventID)
}
// Return cleanup function
cleanup := func() {
sp.unsubscribe(streamID, sub)
}
return sub.ch, cleanup
}
// replayEvents sends buffered events that occurred after lastEventID.
func (sp *StreamPublisher) replayEvents(sub *subscriber, state *streamState, lastEventID string) {
sp.mu.RLock()
defer sp.mu.RUnlock()
found := false
for _, event := range state.eventBuffer {
if found {
if !sub.trySend(event) && sub.closed.Load() {
return // Subscriber closed, stop replay
}
}
if event.ID == lastEventID {
found = true
}
}
// If we didn't find the lastEventID (too old), replay all buffered events
if !found && lastEventID != "" {
for _, event := range state.eventBuffer {
if !sub.trySend(event) && sub.closed.Load() {
return // Subscriber closed, stop replay
}
}
}
}
func (sp *StreamPublisher) unsubscribe(streamID string, sub *subscriber) {
// Close the subscriber channel safely (handles concurrent sends)
sub.doClose()
sp.mu.Lock()
defer sp.mu.Unlock()
state, exists := sp.streams[streamID]
if !exists {
return
}
for i, s := range state.subscribers {
if s == sub {
state.subscribers = append(state.subscribers[:i], state.subscribers[i+1:]...)
break
}
}
}
// Publish sends an event to all subscribers of a stream.
// Returns the generated event ID.
func (sp *StreamPublisher) Publish(streamID string, event port.StreamEvent) string {
state := sp.getOrCreateStream(streamID)
// Generate event ID and populate metadata
seq := state.eventSeq.Add(1)
event.ID = fmt.Sprintf("%s:%d", streamID, seq)
event.Sequence = int64(seq)
if event.Timestamp.IsZero() {
event.Timestamp = time.Now()
}
if event.TaskID == "" {
event.TaskID = streamID // Default to stream ID as task ID
}
sp.mu.Lock()
// Add to buffer for replay
if len(state.eventBuffer) >= state.bufferSize {
// Remove oldest event
state.eventBuffer = state.eventBuffer[1:]
}
state.eventBuffer = append(state.eventBuffer, event)
// Copy subscriber pointers (safe - trySend handles concurrent close)
subscribers := make([]*subscriber, len(state.subscribers))
copy(subscribers, state.subscribers)
sp.mu.Unlock()
// Send to all subscribers using thread-safe trySend
for _, sub := range subscribers {
sub.trySend(event)
}
return event.ID
}
// Close closes a stream and all its subscriptions.
func (sp *StreamPublisher) Close(streamID string) {
sp.mu.Lock()
defer sp.mu.Unlock()
state, exists := sp.streams[streamID]
if !exists {
return
}
for _, sub := range state.subscribers {
sub.doClose()
}
delete(sp.streams, streamID)
}
// SubscriberCount returns the number of subscribers for a stream (for testing).
func (sp *StreamPublisher) SubscriberCount(streamID string) int {
sp.mu.RLock()
defer sp.mu.RUnlock()
state, exists := sp.streams[streamID]
if !exists {
return 0
}
return len(state.subscribers)
}
// BufferedEventCount returns the number of buffered events for a stream (for testing).
func (sp *StreamPublisher) BufferedEventCount(streamID string) int {
sp.mu.RLock()
defer sp.mu.RUnlock()
state, exists := sp.streams[streamID]
if !exists {
return 0
}
return len(state.eventBuffer)
}