stemedb/cmd/load-test/scenarios.go
jordan 157dbbb9eb feat: Complete Aphoria Phase 8-9 + UAT suite (90/90 tests passing)
## Phase 8: Enterprise Extractor Improvements 
- 14 security extractors (TLS, JWT, SQL injection, XSS, etc.)
- 10 framework-specific extractors (Spring, Django, Rails, etc.)
- Config file security detection (YAML, TOML)

## Phase 9: Autonomous Extractor Generation 
- Shadow mode executor with TP/FP tracking
- Graduation pipeline with confidence thresholds
- Auto-rollback on regression detection
- Cross-project pattern syncing

## UAT Suite Complete (14 scripts, 90 tests)
- test-core-detection.sh (6 tests)
- test-declarative-extractors.sh (5 tests)
- test-domain-frameworks.sh (5 tests)
- test-domain-unreal.sh (3 tests)
- test-llm-extraction.sh (6 tests)
- test-eval-harness.sh (5 tests)
- test-cross-language.sh (3 tests)
- test-precommit-performance.sh (4 tests)
- test-output-formats.sh (8 tests)
- test-drift-detection.sh (6 tests)
- test-exit-codes.sh (12 tests)
+ 3 more scripts

## Other Changes
- Updated roadmap to mark Phase 8-9 complete
- Added .gitignore entries for build artifacts
- Updated pre-commit: 800 line limit, exclude tests/data/cmd

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-06 22:50:55 -07:00

303 lines
8.5 KiB
Go

package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/orchard9/stemedb-go/steme"
)
// ScenarioConfig holds configuration for all scenarios.
type ScenarioConfig struct {
Clients []*steme.Client
Signers []*steme.Signer
Duration time.Duration
TargetRPS int
Readers int
Verbose bool
}
// generateAssertion creates a test assertion with a unique subject.
func generateAssertion(index int, _ int) steme.Assertion {
return steme.NewAssertion(
fmt.Sprintf("loadtest:subject_%d", index),
"benchmark_value",
).
WithNumber(float64(index)).
WithConfidence(0.95).
WithLifecycle(steme.LifecycleApproved).
WithSourceClass(steme.SourceClassCommunity).
WithSourceHash("1111111111111111111111111111111111111111111111111111111111111111").
Build()
}
// RunBaselineLatency executes the 10K assertions baseline latency test.
//
// Targets:
// - p50 latency: <50ms
// - p99 latency: <200ms
// - Error rate: 0%
func RunBaselineLatency(ctx context.Context, cfg ScenarioConfig) (*Metrics, error) {
const totalAssertions = 10_000
metrics := NewMetrics()
client := cfg.Clients[0]
fmt.Println("=== Scenario A: Baseline Latency (10K Assertions) ===")
fmt.Printf(" Target: 10,000 assertions, p99 < 200ms, 0%% errors\n\n")
for i := 0; i < totalAssertions; i++ {
select {
case <-ctx.Done():
metrics.Stop()
return metrics, ctx.Err()
default:
}
assertion := generateAssertion(i, 0)
start := time.Now()
_, err := client.Assert(ctx, assertion)
latency := time.Since(start)
metrics.RecordWrite(latency, err)
if cfg.Verbose && i%1000 == 0 && i > 0 {
fmt.Printf(" Progress: %d/%d (p99: %v)\n", i, totalAssertions, metrics.WriteP99())
} else if !cfg.Verbose && i%2000 == 0 && i > 0 {
fmt.Printf(" Progress: %d/%d\n", i, totalAssertions)
}
}
metrics.Stop()
fmt.Printf("\n Completed: %d assertions in %v\n", metrics.WriteCount(), metrics.Duration())
fmt.Printf(" p50: %v, p99: %v, max: %v\n", metrics.WriteP50(), metrics.WriteP99(), metrics.WriteMax())
fmt.Printf(" Error rate: %.2f%%\n\n", metrics.WriteErrorRate())
return metrics, nil
}
// RunSustainedWrites executes the sustained write throughput test.
//
// Targets:
// - Sustained rate: targetRPS for configured duration
// - p99 latency: <200ms throughout
// - Error rate: <0.1%
func RunSustainedWrites(ctx context.Context, cfg ScenarioConfig) (*Metrics, error) {
metrics := NewMetrics()
fmt.Println("=== Scenario B: Sustained Writes ===")
fmt.Printf(" Target: %d writes/sec for %v, p99 < 200ms, <0.1%% errors\n\n", cfg.TargetRPS, cfg.Duration)
// Calculate interval between requests
interval := time.Second / time.Duration(cfg.TargetRPS)
ticker := time.NewTicker(interval)
defer ticker.Stop()
deadline := time.Now().Add(cfg.Duration)
var idx int64
numClients := len(cfg.Clients)
// Track in-flight requests to avoid unbounded goroutine growth
var inFlight int64
const maxInFlight = 5000 // Cap concurrent requests
progressTicker := time.NewTicker(10 * time.Second)
defer progressTicker.Stop()
for time.Now().Before(deadline) {
select {
case <-ctx.Done():
// Wait for in-flight requests
for atomic.LoadInt64(&inFlight) > 0 {
time.Sleep(10 * time.Millisecond)
}
metrics.Stop()
return metrics, ctx.Err()
case <-progressTicker.C:
elapsed := time.Since(metrics.startTime)
remaining := cfg.Duration - elapsed
currentRPS := metrics.WriteThroughput()
fmt.Printf(" Progress: %v elapsed, %v remaining, %.0f avg RPS, p99: %v\n",
elapsed.Round(time.Second), remaining.Round(time.Second), currentRPS, metrics.WriteP99())
case <-ticker.C:
// Check if we have too many in-flight requests
if atomic.LoadInt64(&inFlight) >= maxInFlight {
continue // Skip this tick to let requests complete
}
i := atomic.AddInt64(&idx, 1) - 1
clientIdx := int(i) % numClients
client := cfg.Clients[clientIdx]
atomic.AddInt64(&inFlight, 1)
go func(index int64, c *steme.Client) {
defer atomic.AddInt64(&inFlight, -1)
assertion := generateAssertion(int(index), clientIdx)
start := time.Now()
_, err := c.Assert(ctx, assertion)
latency := time.Since(start)
metrics.RecordWrite(latency, err)
}(i, client)
}
}
// Wait for in-flight requests to complete
fmt.Println(" Waiting for in-flight requests...")
waitStart := time.Now()
for atomic.LoadInt64(&inFlight) > 0 && time.Since(waitStart) < 30*time.Second {
time.Sleep(100 * time.Millisecond)
}
metrics.Stop()
fmt.Printf("\n Completed: %d assertions in %v\n", metrics.WriteCount(), metrics.Duration())
fmt.Printf(" Average throughput: %.0f/sec\n", metrics.WriteThroughput())
fmt.Printf(" p50: %v, p99: %v, max: %v\n", metrics.WriteP50(), metrics.WriteP99(), metrics.WriteMax())
fmt.Printf(" Errors: %d (%.3f%%)\n\n", metrics.WriteErrors(), metrics.WriteErrorRate())
return metrics, nil
}
// RunConcurrentReaders executes the concurrent readers degradation test.
//
// Targets:
// - 100 concurrent readers
// - Read latency degradation: <2x baseline
// - No reader timeouts
func RunConcurrentReaders(ctx context.Context, cfg ScenarioConfig) (*Metrics, error) {
metrics := NewMetrics()
fmt.Println("=== Scenario C: Concurrent Readers ===")
fmt.Printf(" Target: %d concurrent readers, <2x baseline degradation\n\n", cfg.Readers)
client := cfg.Clients[0]
// First: seed some data to query
fmt.Println(" Seeding query data...")
const seedCount = 1000
for i := 0; i < seedCount; i++ {
assertion := generateAssertion(i+1_000_000, 0) // Offset to avoid collision with other tests
_, err := client.Assert(ctx, assertion)
if err != nil {
return nil, fmt.Errorf("failed to seed data: %w", err)
}
}
// Wait for indexing
fmt.Println(" Waiting for indexing...")
time.Sleep(2 * time.Second)
// Measure baseline read latency (single reader, no writes)
fmt.Println(" Measuring baseline read latency...")
baselineMetrics := NewMetrics()
const baselineSamples = 1000
for i := 0; i < baselineSamples; i++ {
subjectIdx := (i % seedCount) + 1_000_000
subject := fmt.Sprintf("loadtest:subject_%d", subjectIdx)
start := time.Now()
_, err := client.Query(ctx, steme.QueryParams{
Subject: &subject,
})
latency := time.Since(start)
baselineMetrics.RecordRead(latency, err)
}
baselineP99 := baselineMetrics.ReadP99()
metrics.SetBaselineP99(baselineP99)
fmt.Printf(" Baseline p99: %v (from %d samples)\n\n", baselineP99, baselineSamples)
// Start background writer
fmt.Println(" Starting background writer (100 writes/sec)...")
writerCtx, cancelWriter := context.WithCancel(ctx)
defer cancelWriter()
const backgroundWriteRPS = 100
var writerIdx int64
go func() {
ticker := time.NewTicker(time.Second / backgroundWriteRPS)
defer ticker.Stop()
for {
select {
case <-writerCtx.Done():
return
case <-ticker.C:
i := atomic.AddInt64(&writerIdx, 1) - 1
assertion := generateAssertion(int(i)+2_000_000, 0) // Different offset
_, _ = client.Assert(writerCtx, assertion)
}
}
}()
// Start concurrent readers
fmt.Printf(" Starting %d concurrent readers for 30 seconds...\n", cfg.Readers)
readerDuration := 30 * time.Second
readerDeadline := time.Now().Add(readerDuration)
var wg sync.WaitGroup
loadedMetrics := NewMetrics()
for r := 0; r < cfg.Readers; r++ {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
clientIdx := readerID % len(cfg.Clients)
c := cfg.Clients[clientIdx]
for time.Now().Before(readerDeadline) {
select {
case <-ctx.Done():
return
default:
}
subjectIdx := (readerID*1000+int(time.Now().UnixNano()%1000))%seedCount + 1_000_000
subject := fmt.Sprintf("loadtest:subject_%d", subjectIdx)
start := time.Now()
_, err := c.Query(ctx, steme.QueryParams{
Subject: &subject,
})
latency := time.Since(start)
loadedMetrics.RecordRead(latency, err)
// Small delay to avoid hammering too hard
time.Sleep(10 * time.Millisecond)
}
}(r)
}
wg.Wait()
cancelWriter()
loadedP99 := loadedMetrics.ReadP99()
metrics.SetLoadedP99(loadedP99)
// Calculate degradation
var degradation float64
if baselineP99 > 0 {
degradation = (float64(loadedP99)/float64(baselineP99) - 1) * 100
}
metrics.SetDegradation(degradation, cfg.Readers, backgroundWriteRPS)
metrics.Stop()
fmt.Printf("\n Completed: %d reads across %d readers\n", loadedMetrics.ReadCount(), cfg.Readers)
fmt.Printf(" Baseline p99: %v\n", baselineP99)
fmt.Printf(" Under-load p99: %v\n", loadedP99)
fmt.Printf(" Degradation: %.1f%%\n", degradation)
fmt.Printf(" Read errors: %d\n\n", loadedMetrics.ReadErrors())
return metrics, nil
}