package main import ( "context" "fmt" "sync" "sync/atomic" "time" "github.com/orchard9/stemedb-go/steme" ) // ScenarioConfig holds configuration for all scenarios. type ScenarioConfig struct { Clients []*steme.Client Signers []*steme.Signer Duration time.Duration TargetRPS int Readers int Verbose bool } // generateAssertion creates a test assertion with a unique subject. func generateAssertion(index int, _ int) steme.Assertion { return steme.NewAssertion( fmt.Sprintf("loadtest:subject_%d", index), "benchmark_value", ). WithNumber(float64(index)). WithConfidence(0.95). WithLifecycle(steme.LifecycleApproved). WithSourceClass(steme.SourceClassCommunity). WithSourceHash("1111111111111111111111111111111111111111111111111111111111111111"). Build() } // RunBaselineLatency executes the 10K assertions baseline latency test. // // Targets: // - p50 latency: <50ms // - p99 latency: <200ms // - Error rate: 0% func RunBaselineLatency(ctx context.Context, cfg ScenarioConfig) (*Metrics, error) { const totalAssertions = 10_000 metrics := NewMetrics() client := cfg.Clients[0] fmt.Println("=== Scenario A: Baseline Latency (10K Assertions) ===") fmt.Printf(" Target: 10,000 assertions, p99 < 200ms, 0%% errors\n\n") for i := 0; i < totalAssertions; i++ { select { case <-ctx.Done(): metrics.Stop() return metrics, ctx.Err() default: } assertion := generateAssertion(i, 0) start := time.Now() _, err := client.Assert(ctx, assertion) latency := time.Since(start) metrics.RecordWrite(latency, err) if cfg.Verbose && i%1000 == 0 && i > 0 { fmt.Printf(" Progress: %d/%d (p99: %v)\n", i, totalAssertions, metrics.WriteP99()) } else if !cfg.Verbose && i%2000 == 0 && i > 0 { fmt.Printf(" Progress: %d/%d\n", i, totalAssertions) } } metrics.Stop() fmt.Printf("\n Completed: %d assertions in %v\n", metrics.WriteCount(), metrics.Duration()) fmt.Printf(" p50: %v, p99: %v, max: %v\n", metrics.WriteP50(), metrics.WriteP99(), metrics.WriteMax()) fmt.Printf(" Error rate: %.2f%%\n\n", metrics.WriteErrorRate()) return metrics, nil } // RunSustainedWrites executes the sustained write throughput test. // // Targets: // - Sustained rate: targetRPS for configured duration // - p99 latency: <200ms throughout // - Error rate: <0.1% func RunSustainedWrites(ctx context.Context, cfg ScenarioConfig) (*Metrics, error) { metrics := NewMetrics() fmt.Println("=== Scenario B: Sustained Writes ===") fmt.Printf(" Target: %d writes/sec for %v, p99 < 200ms, <0.1%% errors\n\n", cfg.TargetRPS, cfg.Duration) // Calculate interval between requests interval := time.Second / time.Duration(cfg.TargetRPS) ticker := time.NewTicker(interval) defer ticker.Stop() deadline := time.Now().Add(cfg.Duration) var idx int64 numClients := len(cfg.Clients) // Track in-flight requests to avoid unbounded goroutine growth var inFlight int64 const maxInFlight = 5000 // Cap concurrent requests progressTicker := time.NewTicker(10 * time.Second) defer progressTicker.Stop() for time.Now().Before(deadline) { select { case <-ctx.Done(): // Wait for in-flight requests for atomic.LoadInt64(&inFlight) > 0 { time.Sleep(10 * time.Millisecond) } metrics.Stop() return metrics, ctx.Err() case <-progressTicker.C: elapsed := time.Since(metrics.startTime) remaining := cfg.Duration - elapsed currentRPS := metrics.WriteThroughput() fmt.Printf(" Progress: %v elapsed, %v remaining, %.0f avg RPS, p99: %v\n", elapsed.Round(time.Second), remaining.Round(time.Second), currentRPS, metrics.WriteP99()) case <-ticker.C: // Check if we have too many in-flight requests if atomic.LoadInt64(&inFlight) >= maxInFlight { continue // Skip this tick to let requests complete } i := atomic.AddInt64(&idx, 1) - 1 clientIdx := int(i) % numClients client := cfg.Clients[clientIdx] atomic.AddInt64(&inFlight, 1) go func(index int64, c *steme.Client) { defer atomic.AddInt64(&inFlight, -1) assertion := generateAssertion(int(index), clientIdx) start := time.Now() _, err := c.Assert(ctx, assertion) latency := time.Since(start) metrics.RecordWrite(latency, err) }(i, client) } } // Wait for in-flight requests to complete fmt.Println(" Waiting for in-flight requests...") waitStart := time.Now() for atomic.LoadInt64(&inFlight) > 0 && time.Since(waitStart) < 30*time.Second { time.Sleep(100 * time.Millisecond) } metrics.Stop() fmt.Printf("\n Completed: %d assertions in %v\n", metrics.WriteCount(), metrics.Duration()) fmt.Printf(" Average throughput: %.0f/sec\n", metrics.WriteThroughput()) fmt.Printf(" p50: %v, p99: %v, max: %v\n", metrics.WriteP50(), metrics.WriteP99(), metrics.WriteMax()) fmt.Printf(" Errors: %d (%.3f%%)\n\n", metrics.WriteErrors(), metrics.WriteErrorRate()) return metrics, nil } // RunConcurrentReaders executes the concurrent readers degradation test. // // Targets: // - 100 concurrent readers // - Read latency degradation: <2x baseline // - No reader timeouts func RunConcurrentReaders(ctx context.Context, cfg ScenarioConfig) (*Metrics, error) { metrics := NewMetrics() fmt.Println("=== Scenario C: Concurrent Readers ===") fmt.Printf(" Target: %d concurrent readers, <2x baseline degradation\n\n", cfg.Readers) client := cfg.Clients[0] // First: seed some data to query fmt.Println(" Seeding query data...") const seedCount = 1000 for i := 0; i < seedCount; i++ { assertion := generateAssertion(i+1_000_000, 0) // Offset to avoid collision with other tests _, err := client.Assert(ctx, assertion) if err != nil { return nil, fmt.Errorf("failed to seed data: %w", err) } } // Wait for indexing fmt.Println(" Waiting for indexing...") time.Sleep(2 * time.Second) // Measure baseline read latency (single reader, no writes) fmt.Println(" Measuring baseline read latency...") baselineMetrics := NewMetrics() const baselineSamples = 1000 for i := 0; i < baselineSamples; i++ { subjectIdx := (i % seedCount) + 1_000_000 subject := fmt.Sprintf("loadtest:subject_%d", subjectIdx) start := time.Now() _, err := client.Query(ctx, steme.QueryParams{ Subject: &subject, }) latency := time.Since(start) baselineMetrics.RecordRead(latency, err) } baselineP99 := baselineMetrics.ReadP99() metrics.SetBaselineP99(baselineP99) fmt.Printf(" Baseline p99: %v (from %d samples)\n\n", baselineP99, baselineSamples) // Start background writer fmt.Println(" Starting background writer (100 writes/sec)...") writerCtx, cancelWriter := context.WithCancel(ctx) defer cancelWriter() const backgroundWriteRPS = 100 var writerIdx int64 go func() { ticker := time.NewTicker(time.Second / backgroundWriteRPS) defer ticker.Stop() for { select { case <-writerCtx.Done(): return case <-ticker.C: i := atomic.AddInt64(&writerIdx, 1) - 1 assertion := generateAssertion(int(i)+2_000_000, 0) // Different offset _, _ = client.Assert(writerCtx, assertion) } } }() // Start concurrent readers fmt.Printf(" Starting %d concurrent readers for 30 seconds...\n", cfg.Readers) readerDuration := 30 * time.Second readerDeadline := time.Now().Add(readerDuration) var wg sync.WaitGroup loadedMetrics := NewMetrics() for r := 0; r < cfg.Readers; r++ { wg.Add(1) go func(readerID int) { defer wg.Done() clientIdx := readerID % len(cfg.Clients) c := cfg.Clients[clientIdx] for time.Now().Before(readerDeadline) { select { case <-ctx.Done(): return default: } subjectIdx := (readerID*1000+int(time.Now().UnixNano()%1000))%seedCount + 1_000_000 subject := fmt.Sprintf("loadtest:subject_%d", subjectIdx) start := time.Now() _, err := c.Query(ctx, steme.QueryParams{ Subject: &subject, }) latency := time.Since(start) loadedMetrics.RecordRead(latency, err) // Small delay to avoid hammering too hard time.Sleep(10 * time.Millisecond) } }(r) } wg.Wait() cancelWriter() loadedP99 := loadedMetrics.ReadP99() metrics.SetLoadedP99(loadedP99) // Calculate degradation var degradation float64 if baselineP99 > 0 { degradation = (float64(loadedP99)/float64(baselineP99) - 1) * 100 } metrics.SetDegradation(degradation, cfg.Readers, backgroundWriteRPS) metrics.Stop() fmt.Printf("\n Completed: %d reads across %d readers\n", loadedMetrics.ReadCount(), cfg.Readers) fmt.Printf(" Baseline p99: %v\n", baselineP99) fmt.Printf(" Under-load p99: %v\n", loadedP99) fmt.Printf(" Degradation: %.1f%%\n", degradation) fmt.Printf(" Read errors: %d\n\n", loadedMetrics.ReadErrors()) return metrics, nil }