Claude Config API (v0.6): - Add CRUD endpoints for commands, skills, and agents - Commands/skills/agents stored in /workspace/.claude/ (per-project, in git) - Credentials shared via PVC at /root/.claude/ (shared across pods) - Use base64 encoding for file writes (prevents shell injection) - Add content size limits (1MB max) Security Hardening: - Add sanitize package for command/prompt validation - Add rate limiting middleware (token bucket algorithm) - Add concurrent command limiting - Add input sanitization to all command handlers - Gitignore secrets.yaml and credentials.yaml - Add *.example templates for secrets Testing Infrastructure: - Add testutil package with mocks and fixtures - Add unit tests for auth package (63% coverage) - Add unit tests for executor (47% coverage) - Add handler integration tests (40% coverage) - Add 100% coverage for sanitize, cmdlimit packages - Add 96% coverage for ratelimit package Infrastructure: - Shared Claude credentials PVC (ReadWriteMany) - Reduced workspace PVC size from 20Gi to 5Gi - Add init container cleanup before git clone - Document Longhorn RWX requirements Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
87 lines
2.0 KiB
Go
87 lines
2.0 KiB
Go
package memory
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// StreamPublisher is an in-memory implementation of port.StreamPublisher.
|
|
type StreamPublisher struct {
|
|
mu sync.RWMutex
|
|
streams map[string][]chan port.StreamEvent
|
|
}
|
|
|
|
// NewStreamPublisher creates a new in-memory stream publisher.
|
|
func NewStreamPublisher() *StreamPublisher {
|
|
return &StreamPublisher{
|
|
streams: make(map[string][]chan port.StreamEvent),
|
|
}
|
|
}
|
|
|
|
// Ensure StreamPublisher implements port.StreamPublisher at compile time.
|
|
var _ port.StreamPublisher = (*StreamPublisher)(nil)
|
|
|
|
// Subscribe creates a subscription to events for the given stream ID.
|
|
func (sp *StreamPublisher) Subscribe(streamID string) (<-chan port.StreamEvent, func()) {
|
|
sp.mu.Lock()
|
|
defer sp.mu.Unlock()
|
|
|
|
ch := make(chan port.StreamEvent, 100)
|
|
sp.streams[streamID] = append(sp.streams[streamID], ch)
|
|
|
|
// Return cleanup function
|
|
cleanup := func() {
|
|
sp.unsubscribe(streamID, ch)
|
|
}
|
|
|
|
return ch, cleanup
|
|
}
|
|
|
|
func (sp *StreamPublisher) unsubscribe(streamID string, ch chan port.StreamEvent) {
|
|
sp.mu.Lock()
|
|
defer sp.mu.Unlock()
|
|
|
|
channels := sp.streams[streamID]
|
|
for i, c := range channels {
|
|
if c == ch {
|
|
sp.streams[streamID] = append(channels[:i], channels[i+1:]...)
|
|
close(ch)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Publish sends an event to all subscribers of a stream.
|
|
func (sp *StreamPublisher) Publish(streamID string, event port.StreamEvent) {
|
|
sp.mu.RLock()
|
|
defer sp.mu.RUnlock()
|
|
|
|
for _, ch := range sp.streams[streamID] {
|
|
select {
|
|
case ch <- event:
|
|
default:
|
|
// Channel full, skip
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close closes a stream and all its subscriptions.
|
|
func (sp *StreamPublisher) Close(streamID string) {
|
|
sp.mu.Lock()
|
|
defer sp.mu.Unlock()
|
|
|
|
for _, ch := range sp.streams[streamID] {
|
|
close(ch)
|
|
}
|
|
delete(sp.streams, streamID)
|
|
}
|
|
|
|
// SubscriberCount returns the number of subscribers for a stream (for testing).
|
|
func (sp *StreamPublisher) SubscriberCount(streamID string) int {
|
|
sp.mu.RLock()
|
|
defer sp.mu.RUnlock()
|
|
|
|
return len(sp.streams[streamID])
|
|
}
|