package memory import ( "sync" "testing" "time" "github.com/orchard9/rdev/internal/port" ) func TestStreamPublisher_PublishAndSubscribe(t *testing.T) { sp := NewStreamPublisher() streamID := "test-stream-1" // Subscribe first ch, cleanup := sp.Subscribe(streamID) defer cleanup() // Publish an event event := port.StreamEvent{ Type: "output", Data: map[string]any{"line": "hello"}, } eventID := sp.Publish(streamID, event) // Receive the event select { case received := <-ch: if received.Type != "output" { t.Errorf("Type = %q, want %q", received.Type, "output") } if received.ID != eventID { t.Errorf("ID = %q, want %q", received.ID, eventID) } if received.Data["line"] != "hello" { t.Errorf("Data[line] = %q, want %q", received.Data["line"], "hello") } case <-time.After(time.Second): t.Fatal("Timeout waiting for event") } } func TestStreamPublisher_EventIDGeneration(t *testing.T) { sp := NewStreamPublisher() streamID := "test-stream-ids" id1 := sp.Publish(streamID, port.StreamEvent{Type: "e1"}) id2 := sp.Publish(streamID, port.StreamEvent{Type: "e2"}) id3 := sp.Publish(streamID, port.StreamEvent{Type: "e3"}) // IDs should be sequential if id1 == id2 || id2 == id3 || id1 == id3 { t.Errorf("Event IDs should be unique: %q, %q, %q", id1, id2, id3) } // IDs should contain stream ID for _, id := range []string{id1, id2, id3} { if len(id) == 0 { t.Error("Event ID should not be empty") } } } func TestStreamPublisher_MultipleSubscribers(t *testing.T) { sp := NewStreamPublisher() streamID := "test-multi-sub" // Create multiple subscribers ch1, cleanup1 := sp.Subscribe(streamID) defer cleanup1() ch2, cleanup2 := sp.Subscribe(streamID) defer cleanup2() ch3, cleanup3 := sp.Subscribe(streamID) defer cleanup3() // Verify subscriber count if count := sp.SubscriberCount(streamID); count != 3 { t.Errorf("SubscriberCount = %d, want 3", count) } // Publish an event sp.Publish(streamID, port.StreamEvent{Type: "broadcast"}) // All subscribers should receive for i, ch := range []<-chan port.StreamEvent{ch1, ch2, ch3} { select { case e := <-ch: if e.Type != "broadcast" { t.Errorf("Subscriber %d: Type = %q, want %q", i+1, e.Type, "broadcast") } case <-time.After(time.Second): t.Errorf("Subscriber %d: Timeout waiting for event", i+1) } } } func TestStreamPublisher_SubscriberCleanup(t *testing.T) { sp := NewStreamPublisher() streamID := "test-cleanup" ch, cleanup := sp.Subscribe(streamID) if count := sp.SubscriberCount(streamID); count != 1 { t.Errorf("SubscriberCount before cleanup = %d, want 1", count) } // Cleanup cleanup() if count := sp.SubscriberCount(streamID); count != 0 { t.Errorf("SubscriberCount after cleanup = %d, want 0", count) } // Channel should be closed select { case _, ok := <-ch: if ok { t.Error("Channel should be closed after cleanup") } default: t.Error("Channel should be closed (not blocked)") } } func TestStreamPublisher_EventReplay(t *testing.T) { sp := NewStreamPublisher() streamID := "test-replay" // Publish some events before subscribing id1 := sp.Publish(streamID, port.StreamEvent{Type: "event1", Data: map[string]any{"n": 1}}) sp.Publish(streamID, port.StreamEvent{Type: "event2", Data: map[string]any{"n": 2}}) sp.Publish(streamID, port.StreamEvent{Type: "event3", Data: map[string]any{"n": 3}}) // Subscribe from id1 - should replay events after id1 ch, cleanup := sp.SubscribeFromID(streamID, id1) defer cleanup() // Give replay goroutine time to run time.Sleep(50 * time.Millisecond) // Should receive event2 and event3 (not event1 since we're replaying from id1) var received []port.StreamEvent timeout := time.After(time.Second) loop: for { select { case e := <-ch: received = append(received, e) if len(received) >= 2 { break loop } case <-timeout: break loop } } if len(received) != 2 { t.Fatalf("Expected 2 replayed events, got %d", len(received)) } if received[0].Data["n"] != 2 { t.Errorf("First replayed event data = %v, want n=2", received[0].Data) } if received[1].Data["n"] != 3 { t.Errorf("Second replayed event data = %v, want n=3", received[1].Data) } } func TestStreamPublisher_EventBuffer(t *testing.T) { sp := NewStreamPublisher() streamID := "test-buffer" // Publish events for i := 0; i < 50; i++ { sp.Publish(streamID, port.StreamEvent{Type: "event"}) } if count := sp.BufferedEventCount(streamID); count != 50 { t.Errorf("BufferedEventCount = %d, want 50", count) } } func TestStreamPublisher_BufferOverflow(t *testing.T) { sp := NewStreamPublisher() streamID := "test-overflow" // Publish more events than buffer size (100) for i := 0; i < 150; i++ { sp.Publish(streamID, port.StreamEvent{Type: "event"}) } // Buffer should be capped at 100 if count := sp.BufferedEventCount(streamID); count != 100 { t.Errorf("BufferedEventCount = %d, want 100 (buffer cap)", count) } } func TestStreamPublisher_Close(t *testing.T) { sp := NewStreamPublisher() streamID := "test-close" ch, _ := sp.Subscribe(streamID) // Close the stream sp.Close(streamID) // Channel should be closed select { case _, ok := <-ch: if ok { t.Error("Channel should be closed") } case <-time.After(100 * time.Millisecond): t.Error("Channel should be closed (not blocked)") } // Subscriber count should be 0 if count := sp.SubscriberCount(streamID); count != 0 { t.Errorf("SubscriberCount after close = %d, want 0", count) } } func TestStreamPublisher_IndependentStreams(t *testing.T) { sp := NewStreamPublisher() ch1, cleanup1 := sp.Subscribe("stream-a") defer cleanup1() ch2, cleanup2 := sp.Subscribe("stream-b") defer cleanup2() // Publish to stream-a only sp.Publish("stream-a", port.StreamEvent{Type: "for-a"}) // stream-a subscriber should receive select { case e := <-ch1: if e.Type != "for-a" { t.Errorf("Stream-a received wrong event: %q", e.Type) } case <-time.After(time.Second): t.Error("Stream-a subscriber should receive event") } // stream-b subscriber should NOT receive select { case e := <-ch2: t.Errorf("Stream-b should not receive event from stream-a, got: %v", e) case <-time.After(100 * time.Millisecond): // Expected - no event for stream-b } } func TestStreamPublisher_ConcurrentPublish(t *testing.T) { sp := NewStreamPublisher() streamID := "test-concurrent" ch, cleanup := sp.Subscribe(streamID) defer cleanup() // Concurrent publishers var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 10; j++ { sp.Publish(streamID, port.StreamEvent{Type: "concurrent"}) } }(i) } // Collect events done := make(chan bool) var received int go func() { timeout := time.After(5 * time.Second) for { select { case <-ch: received++ if received >= 100 { done <- true return } case <-timeout: done <- false return } } }() wg.Wait() success := <-done if !success { t.Errorf("Expected 100 events, received %d", received) } } func TestStreamPublisher_ConcurrentSubscribeUnsubscribe(t *testing.T) { sp := NewStreamPublisher() streamID := "test-sub-unsub" var wg sync.WaitGroup // Concurrent subscribe/unsubscribe for i := 0; i < 50; i++ { wg.Add(1) go func() { defer wg.Done() _, cleanup := sp.Subscribe(streamID) time.Sleep(10 * time.Millisecond) cleanup() }() } // Concurrent publish for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < 10; j++ { sp.Publish(streamID, port.StreamEvent{Type: "test"}) } }() } wg.Wait() // Test passes if no race/deadlock } func TestStreamPublisher_ReplayFromUnknownID(t *testing.T) { sp := NewStreamPublisher() streamID := "test-unknown-replay" // Publish some events sp.Publish(streamID, port.StreamEvent{Type: "e1", Data: map[string]any{"n": 1}}) sp.Publish(streamID, port.StreamEvent{Type: "e2", Data: map[string]any{"n": 2}}) // Subscribe from an ID that doesn't exist (should replay all) ch, cleanup := sp.SubscribeFromID(streamID, "nonexistent-id") defer cleanup() // Give replay time time.Sleep(50 * time.Millisecond) // Should receive all buffered events var received []port.StreamEvent timeout := time.After(time.Second) loop: for { select { case e := <-ch: received = append(received, e) if len(received) >= 2 { break loop } case <-timeout: break loop } } if len(received) != 2 { t.Errorf("Expected 2 events (full replay), got %d", len(received)) } }