Major additions: - Community Next.js app (port 18187) for browsing claims with API docs - stemedb-chaos crate: Fault injection, chaos testing, CRDT properties - Latent ingestion system: Reddit/FDA ingesters with ADK-Go agents - Disputed claims handling: Manual review workflows and validation - Aphoria security scanner: New extractors (SQL injection, command injection, weak crypto, TLS version), policy-based ignores, UAT reports - Docker infrastructure: Dockerfile, docker-compose.yml for full stack - VulnBank demo: Intentionally vulnerable multi-language test corpus SDK & API enhancements: - Source registry handlers for tracking data provenance - Metrics endpoint - Skeptic filtering improvements Code quality: - Split 14 large files (>500 lines) into focused modules - All files now under 500-line limit per project guidelines Documentation: - Chaos testing guide, circuit breakers, observability docs - Phase 7 UAT documentation updates - Martin Kleppmann technical writer agent Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
161 lines
5.3 KiB
Rust
161 lines
5.3 KiB
Rust
//! Episteme (StemeDB) API server binary.
|
|
//!
|
|
//! This starts the HTTP API server with the following components:
|
|
//! 1. Opens Journal (WAL) for writes (via GroupCommitBuffer) and reads
|
|
//! 2. Opens HybridStore (KV storage)
|
|
//! 3. Spawns IngestWorker background task to tail WAL
|
|
//! 4. Starts axum HTTP server with OpenAPI documentation
|
|
//! 5. Optionally enables The Meter (economic throttling)
|
|
//!
|
|
//! # Environment Variables
|
|
//!
|
|
//! | Variable | Default | Description |
|
|
//! |----------|---------|-------------|
|
|
//! | `STEMEDB_WAL_DIR` | `data/wal` | Directory for WAL files |
|
|
//! | `STEMEDB_DB_DIR` | `data/db` | Directory for KV store |
|
|
//! | `STEMEDB_BIND_ADDR` | `127.0.0.1:18180` | HTTP server bind address |
|
|
//! | `STEMEDB_METER_ENABLED` | `true` | Enable economic throttling |
|
|
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use tracing::{error, info};
|
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
|
|
|
use axum::Extension;
|
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
|
use stemedb_api::{create_router, create_router_with_meter, AppState};
|
|
use stemedb_ingest::worker::IngestWorker;
|
|
use stemedb_storage::HybridStore;
|
|
use stemedb_wal::Journal;
|
|
|
|
/// Server configuration.
|
|
#[derive(Debug, Clone)]
|
|
struct Config {
|
|
/// Directory for WAL files
|
|
wal_dir: PathBuf,
|
|
|
|
/// Directory for KV store
|
|
db_dir: PathBuf,
|
|
|
|
/// HTTP server bind address
|
|
bind_addr: String,
|
|
|
|
/// Enable economic throttling (The Meter)
|
|
meter_enabled: bool,
|
|
}
|
|
|
|
impl Default for Config {
|
|
fn default() -> Self {
|
|
Self {
|
|
wal_dir: PathBuf::from("data/wal"),
|
|
db_dir: PathBuf::from("data/db"),
|
|
bind_addr: "127.0.0.1:18180".to_string(),
|
|
meter_enabled: true,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Config {
|
|
/// Load configuration from environment variables.
|
|
fn from_env() -> Self {
|
|
let mut config = Self::default();
|
|
|
|
if let Ok(wal_dir) = std::env::var("STEMEDB_WAL_DIR") {
|
|
config.wal_dir = PathBuf::from(wal_dir);
|
|
}
|
|
|
|
if let Ok(db_dir) = std::env::var("STEMEDB_DB_DIR") {
|
|
config.db_dir = PathBuf::from(db_dir);
|
|
}
|
|
|
|
if let Ok(bind_addr) = std::env::var("STEMEDB_BIND_ADDR") {
|
|
config.bind_addr = bind_addr;
|
|
}
|
|
|
|
if let Ok(meter_enabled) = std::env::var("STEMEDB_METER_ENABLED") {
|
|
config.meter_enabled = meter_enabled.to_lowercase() != "false" && meter_enabled != "0";
|
|
}
|
|
|
|
config
|
|
}
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
// Initialize tracing
|
|
let env_filter = match tracing_subscriber::EnvFilter::try_from_default_env() {
|
|
Ok(filter) => filter,
|
|
Err(_) => "stemedb_api=debug,tower_http=debug".into(),
|
|
};
|
|
|
|
tracing_subscriber::registry().with(env_filter).with(tracing_subscriber::fmt::layer()).init();
|
|
|
|
// Initialize Prometheus metrics recorder (must be done before any metrics are recorded)
|
|
let prometheus_handle = PrometheusBuilder::new()
|
|
.install_recorder()
|
|
.map_err(|e| format!("Failed to install Prometheus recorder: {e}"))?;
|
|
let prometheus_handle = Arc::new(prometheus_handle);
|
|
info!("Prometheus metrics recorder initialized");
|
|
|
|
let config = Config::from_env();
|
|
|
|
info!("Starting Episteme (StemeDB) API server");
|
|
info!(?config, "Configuration loaded");
|
|
|
|
// Ensure directories exist
|
|
std::fs::create_dir_all(&config.wal_dir)?;
|
|
std::fs::create_dir_all(&config.db_dir)?;
|
|
|
|
// Open write Journal (owned by GroupCommitBuffer)
|
|
info!("Opening write Journal at {:?}", config.wal_dir);
|
|
let write_journal = Journal::open(&config.wal_dir)?;
|
|
|
|
// Open read Journal (for IngestWorker to tail)
|
|
info!("Opening read Journal at {:?}", config.wal_dir);
|
|
let read_journal = Journal::open(&config.wal_dir)?;
|
|
|
|
info!("Opening HybridStore at {:?}", config.db_dir);
|
|
let store = Arc::new(HybridStore::open(&config.db_dir)?);
|
|
|
|
// Create application state (initializes GroupCommitBuffer)
|
|
let state = AppState::new(write_journal, read_journal, Arc::clone(&store));
|
|
|
|
// Spawn IngestWorker background task (uses read journal)
|
|
info!("Spawning IngestWorker background task");
|
|
let worker_journal = state.journal.clone();
|
|
let worker_store = store;
|
|
tokio::spawn(async move {
|
|
let worker_result = IngestWorker::new(worker_journal, worker_store).await;
|
|
match worker_result {
|
|
Ok(mut worker) => {
|
|
info!("IngestWorker started, entering run loop");
|
|
worker.run().await;
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to create IngestWorker: {:?}", e);
|
|
}
|
|
}
|
|
});
|
|
|
|
// Build router (with or without metering)
|
|
let app = if config.meter_enabled {
|
|
info!("The Meter enabled: economic throttling active (10K tokens/agent/hour)");
|
|
create_router_with_meter(state)
|
|
} else {
|
|
info!("The Meter disabled: no quota enforcement");
|
|
create_router(state)
|
|
};
|
|
|
|
// Add Prometheus handle extension and /metrics route
|
|
let app = app.layer(Extension(prometheus_handle));
|
|
|
|
// Start server
|
|
let listener = tokio::net::TcpListener::bind(&config.bind_addr).await?;
|
|
info!("API server listening on {}", config.bind_addr);
|
|
info!("Swagger UI available at http://{}/swagger-ui", config.bind_addr);
|
|
|
|
axum::serve(listener, app).await?;
|
|
|
|
Ok(())
|
|
}
|