Major refactoring to hexagonal (ports & adapters) architecture: - Add service layer (apikey_service, project_service) for business logic - Add webhook system with dispatcher and delivery tracking - Add command queue with priority-based processing - Add rate limiting with sliding window algorithm - Add audit logging for command execution - Add OpenTelemetry integration (traces, metrics, spans) - Add circuit breaker for fault tolerance - Add cached repository wrapper for performance - Add comprehensive validation package - Add Kubernetes client integration for pod management - Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks) - Add network policy and PodDisruptionBudget for k8s - Remove legacy executor and projects/registry packages - Untrack secrets.yaml (now managed via envault) - Add coverage.out to .gitignore - Add e2e test infrastructure with docker-compose - Add comprehensive documentation (API, architecture, operations, plans) - Add golangci-lint config and pre-commit hook Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
372 lines
8.4 KiB
Go
372 lines
8.4 KiB
Go
package memory
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
func TestStreamPublisher_PublishAndSubscribe(t *testing.T) {
|
|
sp := NewStreamPublisher()
|
|
streamID := "test-stream-1"
|
|
|
|
// Subscribe first
|
|
ch, cleanup := sp.Subscribe(streamID)
|
|
defer cleanup()
|
|
|
|
// Publish an event
|
|
event := port.StreamEvent{
|
|
Type: "output",
|
|
Data: map[string]any{"line": "hello"},
|
|
}
|
|
eventID := sp.Publish(streamID, event)
|
|
|
|
// Receive the event
|
|
select {
|
|
case received := <-ch:
|
|
if received.Type != "output" {
|
|
t.Errorf("Type = %q, want %q", received.Type, "output")
|
|
}
|
|
if received.ID != eventID {
|
|
t.Errorf("ID = %q, want %q", received.ID, eventID)
|
|
}
|
|
if received.Data["line"] != "hello" {
|
|
t.Errorf("Data[line] = %q, want %q", received.Data["line"], "hello")
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatal("Timeout waiting for event")
|
|
}
|
|
}
|
|
|
|
func TestStreamPublisher_EventIDGeneration(t *testing.T) {
|
|
sp := NewStreamPublisher()
|
|
streamID := "test-stream-ids"
|
|
|
|
id1 := sp.Publish(streamID, port.StreamEvent{Type: "e1"})
|
|
id2 := sp.Publish(streamID, port.StreamEvent{Type: "e2"})
|
|
id3 := sp.Publish(streamID, port.StreamEvent{Type: "e3"})
|
|
|
|
// IDs should be sequential
|
|
if id1 == id2 || id2 == id3 || id1 == id3 {
|
|
t.Errorf("Event IDs should be unique: %q, %q, %q", id1, id2, id3)
|
|
}
|
|
|
|
// IDs should contain stream ID
|
|
for _, id := range []string{id1, id2, id3} {
|
|
if len(id) == 0 {
|
|
t.Error("Event ID should not be empty")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestStreamPublisher_MultipleSubscribers(t *testing.T) {
|
|
sp := NewStreamPublisher()
|
|
streamID := "test-multi-sub"
|
|
|
|
// Create multiple subscribers
|
|
ch1, cleanup1 := sp.Subscribe(streamID)
|
|
defer cleanup1()
|
|
|
|
ch2, cleanup2 := sp.Subscribe(streamID)
|
|
defer cleanup2()
|
|
|
|
ch3, cleanup3 := sp.Subscribe(streamID)
|
|
defer cleanup3()
|
|
|
|
// Verify subscriber count
|
|
if count := sp.SubscriberCount(streamID); count != 3 {
|
|
t.Errorf("SubscriberCount = %d, want 3", count)
|
|
}
|
|
|
|
// Publish an event
|
|
sp.Publish(streamID, port.StreamEvent{Type: "broadcast"})
|
|
|
|
// All subscribers should receive
|
|
for i, ch := range []<-chan port.StreamEvent{ch1, ch2, ch3} {
|
|
select {
|
|
case e := <-ch:
|
|
if e.Type != "broadcast" {
|
|
t.Errorf("Subscriber %d: Type = %q, want %q", i+1, e.Type, "broadcast")
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Errorf("Subscriber %d: Timeout waiting for event", i+1)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestStreamPublisher_SubscriberCleanup(t *testing.T) {
|
|
sp := NewStreamPublisher()
|
|
streamID := "test-cleanup"
|
|
|
|
ch, cleanup := sp.Subscribe(streamID)
|
|
|
|
if count := sp.SubscriberCount(streamID); count != 1 {
|
|
t.Errorf("SubscriberCount before cleanup = %d, want 1", count)
|
|
}
|
|
|
|
// Cleanup
|
|
cleanup()
|
|
|
|
if count := sp.SubscriberCount(streamID); count != 0 {
|
|
t.Errorf("SubscriberCount after cleanup = %d, want 0", count)
|
|
}
|
|
|
|
// Channel should be closed
|
|
select {
|
|
case _, ok := <-ch:
|
|
if ok {
|
|
t.Error("Channel should be closed after cleanup")
|
|
}
|
|
default:
|
|
t.Error("Channel should be closed (not blocked)")
|
|
}
|
|
}
|
|
|
|
func TestStreamPublisher_EventReplay(t *testing.T) {
|
|
sp := NewStreamPublisher()
|
|
streamID := "test-replay"
|
|
|
|
// Publish some events before subscribing
|
|
id1 := sp.Publish(streamID, port.StreamEvent{Type: "event1", Data: map[string]any{"n": 1}})
|
|
sp.Publish(streamID, port.StreamEvent{Type: "event2", Data: map[string]any{"n": 2}})
|
|
sp.Publish(streamID, port.StreamEvent{Type: "event3", Data: map[string]any{"n": 3}})
|
|
|
|
// Subscribe from id1 - should replay events after id1
|
|
ch, cleanup := sp.SubscribeFromID(streamID, id1)
|
|
defer cleanup()
|
|
|
|
// Give replay goroutine time to run
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
// Should receive event2 and event3 (not event1 since we're replaying from id1)
|
|
var received []port.StreamEvent
|
|
timeout := time.After(time.Second)
|
|
|
|
loop:
|
|
for {
|
|
select {
|
|
case e := <-ch:
|
|
received = append(received, e)
|
|
if len(received) >= 2 {
|
|
break loop
|
|
}
|
|
case <-timeout:
|
|
break loop
|
|
}
|
|
}
|
|
|
|
if len(received) != 2 {
|
|
t.Fatalf("Expected 2 replayed events, got %d", len(received))
|
|
}
|
|
|
|
if received[0].Data["n"] != 2 {
|
|
t.Errorf("First replayed event data = %v, want n=2", received[0].Data)
|
|
}
|
|
if received[1].Data["n"] != 3 {
|
|
t.Errorf("Second replayed event data = %v, want n=3", received[1].Data)
|
|
}
|
|
}
|
|
|
|
func TestStreamPublisher_EventBuffer(t *testing.T) {
|
|
sp := NewStreamPublisher()
|
|
streamID := "test-buffer"
|
|
|
|
// Publish events
|
|
for i := 0; i < 50; i++ {
|
|
sp.Publish(streamID, port.StreamEvent{Type: "event"})
|
|
}
|
|
|
|
if count := sp.BufferedEventCount(streamID); count != 50 {
|
|
t.Errorf("BufferedEventCount = %d, want 50", count)
|
|
}
|
|
}
|
|
|
|
func TestStreamPublisher_BufferOverflow(t *testing.T) {
|
|
sp := NewStreamPublisher()
|
|
streamID := "test-overflow"
|
|
|
|
// Publish more events than buffer size (100)
|
|
for i := 0; i < 150; i++ {
|
|
sp.Publish(streamID, port.StreamEvent{Type: "event"})
|
|
}
|
|
|
|
// Buffer should be capped at 100
|
|
if count := sp.BufferedEventCount(streamID); count != 100 {
|
|
t.Errorf("BufferedEventCount = %d, want 100 (buffer cap)", count)
|
|
}
|
|
}
|
|
|
|
func TestStreamPublisher_Close(t *testing.T) {
|
|
sp := NewStreamPublisher()
|
|
streamID := "test-close"
|
|
|
|
ch, _ := sp.Subscribe(streamID)
|
|
|
|
// Close the stream
|
|
sp.Close(streamID)
|
|
|
|
// Channel should be closed
|
|
select {
|
|
case _, ok := <-ch:
|
|
if ok {
|
|
t.Error("Channel should be closed")
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("Channel should be closed (not blocked)")
|
|
}
|
|
|
|
// Subscriber count should be 0
|
|
if count := sp.SubscriberCount(streamID); count != 0 {
|
|
t.Errorf("SubscriberCount after close = %d, want 0", count)
|
|
}
|
|
}
|
|
|
|
func TestStreamPublisher_IndependentStreams(t *testing.T) {
|
|
sp := NewStreamPublisher()
|
|
|
|
ch1, cleanup1 := sp.Subscribe("stream-a")
|
|
defer cleanup1()
|
|
|
|
ch2, cleanup2 := sp.Subscribe("stream-b")
|
|
defer cleanup2()
|
|
|
|
// Publish to stream-a only
|
|
sp.Publish("stream-a", port.StreamEvent{Type: "for-a"})
|
|
|
|
// stream-a subscriber should receive
|
|
select {
|
|
case e := <-ch1:
|
|
if e.Type != "for-a" {
|
|
t.Errorf("Stream-a received wrong event: %q", e.Type)
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Error("Stream-a subscriber should receive event")
|
|
}
|
|
|
|
// stream-b subscriber should NOT receive
|
|
select {
|
|
case e := <-ch2:
|
|
t.Errorf("Stream-b should not receive event from stream-a, got: %v", e)
|
|
case <-time.After(100 * time.Millisecond):
|
|
// Expected - no event for stream-b
|
|
}
|
|
}
|
|
|
|
func TestStreamPublisher_ConcurrentPublish(t *testing.T) {
|
|
sp := NewStreamPublisher()
|
|
streamID := "test-concurrent"
|
|
|
|
ch, cleanup := sp.Subscribe(streamID)
|
|
defer cleanup()
|
|
|
|
// Concurrent publishers
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(1)
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
for j := 0; j < 10; j++ {
|
|
sp.Publish(streamID, port.StreamEvent{Type: "concurrent"})
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// Collect events
|
|
done := make(chan bool)
|
|
var received int
|
|
go func() {
|
|
timeout := time.After(5 * time.Second)
|
|
for {
|
|
select {
|
|
case <-ch:
|
|
received++
|
|
if received >= 100 {
|
|
done <- true
|
|
return
|
|
}
|
|
case <-timeout:
|
|
done <- false
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
success := <-done
|
|
|
|
if !success {
|
|
t.Errorf("Expected 100 events, received %d", received)
|
|
}
|
|
}
|
|
|
|
func TestStreamPublisher_ConcurrentSubscribeUnsubscribe(t *testing.T) {
|
|
sp := NewStreamPublisher()
|
|
streamID := "test-sub-unsub"
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
// Concurrent subscribe/unsubscribe
|
|
for i := 0; i < 50; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
_, cleanup := sp.Subscribe(streamID)
|
|
time.Sleep(10 * time.Millisecond)
|
|
cleanup()
|
|
}()
|
|
}
|
|
|
|
// Concurrent publish
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for j := 0; j < 10; j++ {
|
|
sp.Publish(streamID, port.StreamEvent{Type: "test"})
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
// Test passes if no race/deadlock
|
|
}
|
|
|
|
func TestStreamPublisher_ReplayFromUnknownID(t *testing.T) {
|
|
sp := NewStreamPublisher()
|
|
streamID := "test-unknown-replay"
|
|
|
|
// Publish some events
|
|
sp.Publish(streamID, port.StreamEvent{Type: "e1", Data: map[string]any{"n": 1}})
|
|
sp.Publish(streamID, port.StreamEvent{Type: "e2", Data: map[string]any{"n": 2}})
|
|
|
|
// Subscribe from an ID that doesn't exist (should replay all)
|
|
ch, cleanup := sp.SubscribeFromID(streamID, "nonexistent-id")
|
|
defer cleanup()
|
|
|
|
// Give replay time
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
// Should receive all buffered events
|
|
var received []port.StreamEvent
|
|
timeout := time.After(time.Second)
|
|
|
|
loop:
|
|
for {
|
|
select {
|
|
case e := <-ch:
|
|
received = append(received, e)
|
|
if len(received) >= 2 {
|
|
break loop
|
|
}
|
|
case <-timeout:
|
|
break loop
|
|
}
|
|
}
|
|
|
|
if len(received) != 2 {
|
|
t.Errorf("Expected 2 events (full replay), got %d", len(received))
|
|
}
|
|
}
|