This commit captures the current state before implementing the composable monorepo template system. Key changes included: Infrastructure: - Add CockroachDB provisioner adapter for database provisioning - Add Redis provisioner adapter for cache provisioning - Add build events system with PostgreSQL storage - Add WebSocket endpoint for real-time build progress Code agent improvements: - Fix Claude Code adapter to use default allowed tools instead of dangerously-skip-permissions - Add context-aware stream closing for cancellation support - Improve parser tests for edge cases Build system: - Add build event constants and metrics - Remove deprecated git_operations.go (replaced by pod_git_operations.go) - Add rollback logic for multi-step provisioning operations Documentation: - Add composable-monorepo feature documentation - Add DNS/Cloudflare service documentation - Update deployment and troubleshooting guides Cookbooks: - Add fullstack-app cookbook - Refactor landing-test with shared library Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
244 lines
6.1 KiB
Go
244 lines
6.1 KiB
Go
package memory
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// StreamPublisher is an in-memory implementation of port.StreamPublisher
|
|
// with event ID generation and replay buffer support.
|
|
type StreamPublisher struct {
|
|
mu sync.RWMutex
|
|
streams map[string]*streamState
|
|
}
|
|
|
|
// subscriber wraps a channel with closed state to prevent send-on-closed-channel races.
|
|
type subscriber struct {
|
|
ch chan port.StreamEvent
|
|
closed atomic.Bool
|
|
mu sync.Mutex // protects close and send operations
|
|
}
|
|
|
|
// trySend attempts to send an event to the subscriber's channel.
|
|
// Returns false if the subscriber is closed or channel is full.
|
|
// This is safe to call concurrently with close operations.
|
|
func (s *subscriber) trySend(event port.StreamEvent) bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.closed.Load() {
|
|
return false
|
|
}
|
|
|
|
select {
|
|
case s.ch <- event:
|
|
return true
|
|
default:
|
|
return false // Channel full
|
|
}
|
|
}
|
|
|
|
// doClose closes the subscriber channel safely.
|
|
// This is safe to call concurrently with send operations.
|
|
func (s *subscriber) doClose() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if !s.closed.Swap(true) {
|
|
// Only close if we're the one who set closed to true
|
|
close(s.ch)
|
|
}
|
|
}
|
|
|
|
// streamState holds the state for a single stream.
|
|
type streamState struct {
|
|
subscribers []*subscriber
|
|
eventBuffer []port.StreamEvent // Circular buffer for replay
|
|
eventSeq atomic.Uint64 // Monotonic event sequence
|
|
bufferSize int // Max events to keep for replay
|
|
}
|
|
|
|
// NewStreamPublisher creates a new in-memory stream publisher.
|
|
func NewStreamPublisher() *StreamPublisher {
|
|
return &StreamPublisher{
|
|
streams: make(map[string]*streamState),
|
|
}
|
|
}
|
|
|
|
// Ensure StreamPublisher implements port.StreamPublisher at compile time.
|
|
var _ port.StreamPublisher = (*StreamPublisher)(nil)
|
|
|
|
// getOrCreateStream returns the stream state, creating it if necessary.
|
|
func (sp *StreamPublisher) getOrCreateStream(streamID string) *streamState {
|
|
sp.mu.Lock()
|
|
defer sp.mu.Unlock()
|
|
|
|
state, exists := sp.streams[streamID]
|
|
if !exists {
|
|
state = &streamState{
|
|
eventBuffer: make([]port.StreamEvent, 0, 100),
|
|
bufferSize: 100, // Keep last 100 events for replay
|
|
}
|
|
sp.streams[streamID] = state
|
|
}
|
|
return state
|
|
}
|
|
|
|
// Subscribe creates a subscription to events for the given stream ID.
|
|
func (sp *StreamPublisher) Subscribe(streamID string) (<-chan port.StreamEvent, func()) {
|
|
return sp.SubscribeFromID(streamID, "")
|
|
}
|
|
|
|
// SubscribeFromID creates a subscription starting from a specific event ID.
|
|
// Events since lastEventID will be replayed before new events are delivered.
|
|
func (sp *StreamPublisher) SubscribeFromID(streamID string, lastEventID string) (<-chan port.StreamEvent, func()) {
|
|
state := sp.getOrCreateStream(streamID)
|
|
|
|
sp.mu.Lock()
|
|
defer sp.mu.Unlock()
|
|
|
|
sub := &subscriber{
|
|
ch: make(chan port.StreamEvent, 100),
|
|
}
|
|
state.subscribers = append(state.subscribers, sub)
|
|
|
|
// Replay events if lastEventID is provided
|
|
if lastEventID != "" {
|
|
go sp.replayEvents(sub, state, lastEventID)
|
|
}
|
|
|
|
// Return cleanup function
|
|
cleanup := func() {
|
|
sp.unsubscribe(streamID, sub)
|
|
}
|
|
|
|
return sub.ch, cleanup
|
|
}
|
|
|
|
// replayEvents sends buffered events that occurred after lastEventID.
|
|
func (sp *StreamPublisher) replayEvents(sub *subscriber, state *streamState, lastEventID string) {
|
|
sp.mu.RLock()
|
|
defer sp.mu.RUnlock()
|
|
|
|
found := false
|
|
for _, event := range state.eventBuffer {
|
|
if found {
|
|
if !sub.trySend(event) && sub.closed.Load() {
|
|
return // Subscriber closed, stop replay
|
|
}
|
|
}
|
|
if event.ID == lastEventID {
|
|
found = true
|
|
}
|
|
}
|
|
|
|
// If we didn't find the lastEventID (too old), replay all buffered events
|
|
if !found && lastEventID != "" {
|
|
for _, event := range state.eventBuffer {
|
|
if !sub.trySend(event) && sub.closed.Load() {
|
|
return // Subscriber closed, stop replay
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sp *StreamPublisher) unsubscribe(streamID string, sub *subscriber) {
|
|
// Close the subscriber channel safely (handles concurrent sends)
|
|
sub.doClose()
|
|
|
|
sp.mu.Lock()
|
|
defer sp.mu.Unlock()
|
|
|
|
state, exists := sp.streams[streamID]
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
for i, s := range state.subscribers {
|
|
if s == sub {
|
|
state.subscribers = append(state.subscribers[:i], state.subscribers[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Publish sends an event to all subscribers of a stream.
|
|
// Returns the generated event ID.
|
|
func (sp *StreamPublisher) Publish(streamID string, event port.StreamEvent) string {
|
|
state := sp.getOrCreateStream(streamID)
|
|
|
|
// Generate event ID and populate metadata
|
|
seq := state.eventSeq.Add(1)
|
|
event.ID = fmt.Sprintf("%s:%d", streamID, seq)
|
|
event.Sequence = int64(seq)
|
|
if event.Timestamp.IsZero() {
|
|
event.Timestamp = time.Now()
|
|
}
|
|
if event.TaskID == "" {
|
|
event.TaskID = streamID // Default to stream ID as task ID
|
|
}
|
|
|
|
sp.mu.Lock()
|
|
// Add to buffer for replay
|
|
if len(state.eventBuffer) >= state.bufferSize {
|
|
// Remove oldest event
|
|
state.eventBuffer = state.eventBuffer[1:]
|
|
}
|
|
state.eventBuffer = append(state.eventBuffer, event)
|
|
// Copy subscriber pointers (safe - trySend handles concurrent close)
|
|
subscribers := make([]*subscriber, len(state.subscribers))
|
|
copy(subscribers, state.subscribers)
|
|
sp.mu.Unlock()
|
|
|
|
// Send to all subscribers using thread-safe trySend
|
|
for _, sub := range subscribers {
|
|
sub.trySend(event)
|
|
}
|
|
|
|
return event.ID
|
|
}
|
|
|
|
// Close closes a stream and all its subscriptions.
|
|
func (sp *StreamPublisher) Close(streamID string) {
|
|
sp.mu.Lock()
|
|
defer sp.mu.Unlock()
|
|
|
|
state, exists := sp.streams[streamID]
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
for _, sub := range state.subscribers {
|
|
sub.doClose()
|
|
}
|
|
delete(sp.streams, streamID)
|
|
}
|
|
|
|
// SubscriberCount returns the number of subscribers for a stream (for testing).
|
|
func (sp *StreamPublisher) SubscriberCount(streamID string) int {
|
|
sp.mu.RLock()
|
|
defer sp.mu.RUnlock()
|
|
|
|
state, exists := sp.streams[streamID]
|
|
if !exists {
|
|
return 0
|
|
}
|
|
return len(state.subscribers)
|
|
}
|
|
|
|
// BufferedEventCount returns the number of buffered events for a stream (for testing).
|
|
func (sp *StreamPublisher) BufferedEventCount(streamID string) int {
|
|
sp.mu.RLock()
|
|
defer sp.mu.RUnlock()
|
|
|
|
state, exists := sp.streams[streamID]
|
|
if !exists {
|
|
return 0
|
|
}
|
|
return len(state.eventBuffer)
|
|
}
|