diff --git a/Cargo.lock b/Cargo.lock index 50bdf26..1bb7cc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3324,9 +3324,12 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "subtle", "thiserror 2.0.18", "tidaldb", "tokio", + "tower", + "tower-http 0.6.8", "tracing", "tracing-subscriber", ] @@ -3517,6 +3520,7 @@ dependencies = [ "pin-project-lite", "sync_wrapper", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -3560,9 +3564,12 @@ dependencies = [ "http-body", "iri-string", "pin-project-lite", + "tokio", "tower", "tower-layer", "tower-service", + "tracing", + "uuid", ] [[package]] diff --git a/docker/cluster/Dockerfile b/docker/cluster/Dockerfile index 93a1ad0..8f14d29 100644 --- a/docker/cluster/Dockerfile +++ b/docker/cluster/Dockerfile @@ -1,3 +1,9 @@ +# LEGACY: This file was originally a simulated multi-region cluster image. +# The cluster mode has been removed from tidal-server. This Dockerfile now +# builds an identical standalone image and is preserved only to avoid breaking +# existing CI references. +# +# For new deployments use docker/standalone/Dockerfile instead. FROM rust:1.91 as builder WORKDIR /app @@ -19,12 +25,16 @@ RUN cargo build -p tidal-server --release FROM debian:bookworm-slim WORKDIR /srv RUN useradd --system --home /srv tidal && \ - apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* + apt-get update && apt-get install -y ca-certificates curl && \ + rm -rf /var/lib/apt/lists/* COPY --from=builder /app/target/release/tidal-server /usr/local/bin/tidal-server COPY tidal-server/config /etc/tidal-server USER tidal -EXPOSE 9500 +EXPOSE 9400 -ENTRYPOINT ["tidal-server", "cluster", "--listen", "0.0.0.0:9500", "--schema", "/etc/tidal-server/default-schema.yaml", "--topology", "/etc/tidal-server/default-cluster.yaml"] +HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=3 \ + CMD curl -f -H "Authorization: Bearer ${TIDAL_API_KEY:-}" http://localhost:9400/health || exit 1 + +ENTRYPOINT ["tidal-server", "standalone", "--listen", "0.0.0.0:9400"] diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..c3d4f38 --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,31 @@ +services: + tidaldb: + build: + context: .. + dockerfile: docker/standalone/Dockerfile + ports: + - "9400:9400" + - "9091:9091" + environment: + - TIDAL_API_KEY=${TIDAL_API_KEY} + - TIDAL_SERVER_LOG=info + volumes: + - tidaldb-data:/var/lib/tidaldb + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "-H", "Authorization: Bearer ${TIDAL_API_KEY}", "http://localhost:9400/health"] + interval: 30s + timeout: 5s + retries: 3 + + prometheus: + image: prom/prometheus:v2.53.0 + ports: + - "9090:9090" + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro + depends_on: + - tidaldb + +volumes: + tidaldb-data: diff --git a/docker/prometheus.yml b/docker/prometheus.yml new file mode 100644 index 0000000..4110900 --- /dev/null +++ b/docker/prometheus.yml @@ -0,0 +1,8 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + - job_name: tidaldb + static_configs: + - targets: ['tidaldb:9091'] diff --git a/docker/standalone/Dockerfile b/docker/standalone/Dockerfile new file mode 100644 index 0000000..8c84fd3 --- /dev/null +++ b/docker/standalone/Dockerfile @@ -0,0 +1,35 @@ +FROM rust:1.91 AS builder +WORKDIR /app + +# Copy workspace manifests first for layer caching. +COPY Cargo.toml Cargo.lock ./ +COPY tidal/Cargo.toml tidal/Cargo.toml +COPY tidalctl/Cargo.toml tidalctl/Cargo.toml +COPY tidal-server/Cargo.toml tidal-server/Cargo.toml +COPY applications/forage/engine/Cargo.toml applications/forage/engine/Cargo.toml +COPY applications/forage/server/Cargo.toml applications/forage/server/Cargo.toml +COPY applications/forage/embedder/Cargo.toml applications/forage/embedder/Cargo.toml +COPY applications/iknowyou/engine/Cargo.toml applications/iknowyou/engine/Cargo.toml + +# Copy full workspace and build. +COPY . . +RUN cargo build -p tidal-server --release + +FROM debian:bookworm-slim +WORKDIR /srv +RUN useradd --system --home /srv tidal && \ + apt-get update && apt-get install -y ca-certificates curl && \ + rm -rf /var/lib/apt/lists/* + +COPY --from=builder /app/target/release/tidal-server /usr/local/bin/tidal-server +COPY tidal-server/config /etc/tidal-server + +USER tidal +EXPOSE 9400 9091 + +HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=3 \ + CMD curl -f -H "Authorization: Bearer ${TIDAL_API_KEY:-}" http://localhost:9400/health || exit 1 + +ENTRYPOINT ["tidal-server", "standalone", \ + "--listen", "0.0.0.0:9400", \ + "--metrics", "0.0.0.0:9091"] diff --git a/docs/ops/grafana-dashboard.json b/docs/ops/grafana-dashboard.json new file mode 100644 index 0000000..b8d4ff1 --- /dev/null +++ b/docs/ops/grafana-dashboard.json @@ -0,0 +1,523 @@ +{ + "uid": "tidaldb-overview", + "title": "tidalDB Overview", + "description": "Operational dashboard covering all 20 tidalDB metrics including retrieve and search latency histograms.", + "schemaVersion": 38, + "version": 2, + "refresh": "30s", + "time": { "from": "now-1h", "to": "now" }, + "timepicker": {}, + "tags": ["tidaldb"], + "panels": [ + { + "id": 1, + "type": "row", + "title": "Health Overview", + "gridPos": { "x": 0, "y": 0, "w": 24, "h": 1 }, + "collapsed": false + }, + { + "id": 2, + "type": "stat", + "title": "Health", + "gridPos": { "x": 0, "y": 1, "w": 4, "h": 4 }, + "targets": [ + { + "expr": "tidaldb_health_ok", + "legendFormat": "health_ok" + } + ], + "fieldConfig": { + "defaults": { + "mappings": [ + { "type": "value", "options": { "0": { "text": "UNHEALTHY", "color": "red" } } }, + { "type": "value", "options": { "1": { "text": "OK", "color": "green" } } } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "red", "value": 0 }, + { "color": "green", "value": 1 } + ] + }, + "color": { "mode": "thresholds" } + } + }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "orientation": "auto", "colorMode": "background" } + }, + { + "id": 3, + "type": "stat", + "title": "Uptime", + "gridPos": { "x": 4, "y": 1, "w": 4, "h": 4 }, + "targets": [ + { + "expr": "tidaldb_uptime_seconds", + "legendFormat": "uptime_seconds" + } + ], + "fieldConfig": { + "defaults": { + "unit": "s", + "color": { "mode": "fixed", "fixedColor": "blue" } + } + }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "orientation": "auto", "colorMode": "value" } + }, + { + "id": 4, + "type": "stat", + "title": "Degradation Level", + "gridPos": { "x": 8, "y": 1, "w": 4, "h": 4 }, + "targets": [ + { + "expr": "tidaldb_degradation_level", + "legendFormat": "degradation_level" + } + ], + "fieldConfig": { + "defaults": { + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": 0 }, + { "color": "red", "value": 1 } + ] + }, + "color": { "mode": "thresholds" } + } + }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "orientation": "auto", "colorMode": "background" } + }, + { + "id": 5, + "type": "stat", + "title": "Version", + "gridPos": { "x": 12, "y": 1, "w": 4, "h": 4 }, + "targets": [ + { + "expr": "tidaldb_info", + "legendFormat": "{{version}}" + } + ], + "fieldConfig": { + "defaults": { + "color": { "mode": "fixed", "fixedColor": "text" } + } + }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "orientation": "auto", "colorMode": "none", "textMode": "name" } + }, + + { + "id": 10, + "type": "row", + "title": "Signal Throughput", + "gridPos": { "x": 0, "y": 5, "w": 24, "h": 1 }, + "collapsed": false + }, + { + "id": 11, + "type": "timeseries", + "title": "Signal Write Rate (per second)", + "gridPos": { "x": 0, "y": 6, "w": 8, "h": 6 }, + "targets": [ + { + "expr": "rate(tidaldb_signal_writes_total[5m])", + "legendFormat": "writes/s" + } + ], + "fieldConfig": { + "defaults": { + "unit": "reqps", + "color": { "mode": "palette-classic" } + } + } + }, + { + "id": 12, + "type": "timeseries", + "title": "Signal Write Latency (µs)", + "gridPos": { "x": 8, "y": 6, "w": 8, "h": 6 }, + "targets": [ + { + "expr": "tidaldb_signal_write_latency_us", + "legendFormat": "latency_us" + } + ], + "fieldConfig": { + "defaults": { + "unit": "µs", + "color": { "mode": "palette-classic" } + } + } + }, + { + "id": 13, + "type": "gauge", + "title": "Signal Hot Entries", + "gridPos": { "x": 16, "y": 6, "w": 8, "h": 6 }, + "targets": [ + { + "expr": "tidaldb_signal_hot_entries", + "legendFormat": "hot_entries" + } + ], + "fieldConfig": { + "defaults": { + "min": 0, + "max": 5000000, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": 0 }, + { "color": "yellow", "value": 4000000 }, + { "color": "red", "value": 5000000 } + ] + }, + "color": { "mode": "thresholds" } + } + } + }, + + { + "id": 14, + "type": "timeseries", + "title": "Retrieve Latency Percentiles (µs)", + "gridPos": { "x": 0, "y": 12, "w": 12, "h": 6 }, + "targets": [ + { + "expr": "histogram_quantile(0.50, rate(tidaldb_retrieve_latency_us_bucket[$__rate_interval]))", + "legendFormat": "p50" + }, + { + "expr": "histogram_quantile(0.95, rate(tidaldb_retrieve_latency_us_bucket[$__rate_interval]))", + "legendFormat": "p95" + }, + { + "expr": "histogram_quantile(0.99, rate(tidaldb_retrieve_latency_us_bucket[$__rate_interval]))", + "legendFormat": "p99" + } + ], + "fieldConfig": { + "defaults": { + "unit": "µs", + "color": { "mode": "palette-classic" }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": 0 }, + { "color": "yellow", "value": 500000 } + ] + } + } + } + }, + { + "id": 15, + "type": "timeseries", + "title": "Search Latency Percentiles (µs)", + "gridPos": { "x": 12, "y": 12, "w": 12, "h": 6 }, + "targets": [ + { + "expr": "histogram_quantile(0.50, rate(tidaldb_search_latency_us_bucket[$__rate_interval]))", + "legendFormat": "p50" + }, + { + "expr": "histogram_quantile(0.95, rate(tidaldb_search_latency_us_bucket[$__rate_interval]))", + "legendFormat": "p95" + }, + { + "expr": "histogram_quantile(0.99, rate(tidaldb_search_latency_us_bucket[$__rate_interval]))", + "legendFormat": "p99" + } + ], + "fieldConfig": { + "defaults": { + "unit": "µs", + "color": { "mode": "palette-classic" }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": 0 }, + { "color": "yellow", "value": 1000000 } + ] + } + } + } + }, + + { + "id": 20, + "type": "row", + "title": "Durability", + "gridPos": { "x": 0, "y": 19, "w": 24, "h": 1 }, + "collapsed": false + }, + { + "id": 21, + "type": "timeseries", + "title": "Checkpoint Age (seconds)", + "gridPos": { "x": 0, "y": 20, "w": 6, "h": 6 }, + "targets": [ + { + "expr": "tidaldb_checkpoint_age_seconds", + "legendFormat": "checkpoint_age_seconds" + } + ], + "fieldConfig": { + "defaults": { + "unit": "s", + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": 0 }, + { "color": "red", "value": 300 } + ] + }, + "color": { "mode": "thresholds" } + } + } + }, + { + "id": 22, + "type": "stat", + "title": "Checkpoint Failures", + "gridPos": { "x": 6, "y": 20, "w": 6, "h": 6 }, + "targets": [ + { + "expr": "tidaldb_checkpoint_failures_total", + "legendFormat": "checkpoint_failures_total" + } + ], + "fieldConfig": { + "defaults": { + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": 0 }, + { "color": "red", "value": 1 } + ] + }, + "color": { "mode": "thresholds" } + } + }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "background" } + }, + { + "id": 23, + "type": "timeseries", + "title": "WAL Lag (bytes)", + "gridPos": { "x": 12, "y": 20, "w": 6, "h": 6 }, + "targets": [ + { + "expr": "tidaldb_wal_lag_bytes", + "legendFormat": "wal_lag_bytes" + } + ], + "fieldConfig": { + "defaults": { + "unit": "bytes", + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": 0 }, + { "color": "yellow", "value": 500000000 }, + { "color": "red", "value": 1000000000 } + ] + }, + "color": { "mode": "palette-classic" } + } + } + }, + { + "id": 24, + "type": "timeseries", + "title": "WAL Compacted Segments (rate)", + "gridPos": { "x": 18, "y": 20, "w": 6, "h": 6 }, + "targets": [ + { + "expr": "rate(tidaldb_wal_compacted_segments_total[5m])", + "legendFormat": "compacted/s" + } + ], + "fieldConfig": { + "defaults": { + "unit": "cps", + "color": { "mode": "palette-classic" } + } + } + }, + + { + "id": 30, + "type": "row", + "title": "Index Health", + "gridPos": { "x": 0, "y": 26, "w": 24, "h": 1 }, + "collapsed": false + }, + { + "id": 31, + "type": "stat", + "title": "Tantivy Indexed Docs", + "gridPos": { "x": 0, "y": 27, "w": 4, "h": 4 }, + "targets": [ + { "expr": "tidaldb_tantivy_indexed_docs", "legendFormat": "indexed_docs" } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { "mode": "fixed", "fixedColor": "blue" } + } + }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "value" } + }, + { + "id": 32, + "type": "gauge", + "title": "Tantivy Segment Count", + "gridPos": { "x": 4, "y": 27, "w": 4, "h": 4 }, + "targets": [ + { "expr": "tidaldb_tantivy_segment_count", "legendFormat": "segment_count" } + ], + "fieldConfig": { + "defaults": { + "min": 0, + "max": 50, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": 0 }, + { "color": "yellow", "value": 20 }, + { "color": "red", "value": 30 } + ] + }, + "color": { "mode": "thresholds" } + } + } + }, + { + "id": 33, + "type": "stat", + "title": "uSearch Vector Count", + "gridPos": { "x": 8, "y": 27, "w": 4, "h": 4 }, + "targets": [ + { "expr": "tidaldb_usearch_vector_count", "legendFormat": "vector_count" } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { "mode": "fixed", "fixedColor": "blue" } + } + }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "value" } + }, + { + "id": 34, + "type": "stat", + "title": "uSearch Index Size", + "gridPos": { "x": 12, "y": 27, "w": 4, "h": 4 }, + "targets": [ + { "expr": "tidaldb_usearch_index_size_bytes", "legendFormat": "index_size_bytes" } + ], + "fieldConfig": { + "defaults": { + "unit": "bytes", + "color": { "mode": "fixed", "fixedColor": "blue" } + } + }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "value" } + }, + { + "id": 35, + "type": "stat", + "title": "Bitmap Index Cardinality", + "gridPos": { "x": 16, "y": 27, "w": 4, "h": 4 }, + "targets": [ + { "expr": "tidaldb_bitmap_index_cardinality", "legendFormat": "bitmap_cardinality" } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { "mode": "fixed", "fixedColor": "blue" } + } + }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "value" } + }, + + { + "id": 40, + "type": "row", + "title": "Sessions", + "gridPos": { "x": 0, "y": 31, "w": 24, "h": 1 }, + "collapsed": false + }, + { + "id": 41, + "type": "timeseries", + "title": "Active Sessions", + "gridPos": { "x": 0, "y": 32, "w": 6, "h": 6 }, + "targets": [ + { "expr": "tidaldb_active_sessions", "legendFormat": "active_sessions" } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { "mode": "palette-classic" } + } + } + }, + { + "id": 42, + "type": "timeseries", + "title": "Session Close Rate (per second)", + "gridPos": { "x": 6, "y": 32, "w": 6, "h": 6 }, + "targets": [ + { "expr": "rate(tidaldb_closed_sessions_total[5m])", "legendFormat": "closes/s" } + ], + "fieldConfig": { + "defaults": { + "unit": "reqps", + "color": { "mode": "palette-classic" } + } + } + }, + { + "id": 43, + "type": "stat", + "title": "Auto-Closed Sessions", + "gridPos": { "x": 12, "y": 32, "w": 4, "h": 6 }, + "targets": [ + { "expr": "tidaldb_session_auto_closed_total", "legendFormat": "auto_closed_total" } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { "mode": "fixed", "fixedColor": "yellow" } + } + }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "value" } + }, + { + "id": 44, + "type": "timeseries", + "title": "Rate Limited (per second)", + "gridPos": { "x": 16, "y": 32, "w": 8, "h": 6 }, + "targets": [ + { "expr": "rate(tidaldb_rate_limited_total[5m])", "legendFormat": "rate_limited/s" } + ], + "fieldConfig": { + "defaults": { + "unit": "reqps", + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": 0 }, + { "color": "red", "value": 100 } + ] + }, + "color": { "mode": "palette-classic" } + } + } + } + ] +} diff --git a/docs/ops/prometheus-alerts.yaml b/docs/ops/prometheus-alerts.yaml new file mode 100644 index 0000000..3bbea55 --- /dev/null +++ b/docs/ops/prometheus-alerts.yaml @@ -0,0 +1,90 @@ +groups: + - name: tidaldb + interval: 30s + rules: + - alert: TidalDBDown + expr: tidaldb_health_ok == 0 + for: 1m + labels: { severity: critical } + annotations: + summary: "tidalDB is unhealthy" + description: "tidaldb_health_ok is 0 — database is unhealthy or shut down." + + - alert: TidalDBCheckpointStale + expr: tidaldb_checkpoint_age_seconds > 300 + for: 2m + labels: { severity: warning } + annotations: + summary: "Signal checkpoint not running" + description: "{{ $value }}s since last checkpoint (threshold: 300s). Signal durability at risk." + + - alert: TidalDBCheckpointFailures + expr: increase(tidaldb_checkpoint_failures_total[5m]) > 0 + labels: { severity: warning } + annotations: + summary: "Signal checkpoint failures detected" + description: "Checkpoint failures in last 5m. Check disk space and storage errors." + + - alert: TidalDBWALDiskPressure + expr: tidaldb_wal_lag_bytes > 1000000000 + for: 5m + labels: { severity: warning } + annotations: + summary: "WAL disk usage exceeds 1GB" + description: "{{ $value | humanize1024 }}B of WAL uncompacted. Compaction may be stuck." + + - alert: TidalDBSignalBacklog + expr: tidaldb_signal_hot_entries > 4000000 + for: 5m + labels: { severity: warning } + annotations: + summary: "Signal ledger over 80% of capacity" + description: "{{ $value }} hot entries (threshold: 4M / 80% of 5M budget)." + + - alert: TidalDBDegradedRanking + expr: tidaldb_degradation_level > 0 + for: 2m + labels: { severity: warning } + annotations: + summary: "Ranking quality degraded" + description: "Degradation level {{ $value }} active. Scale up or reduce load." + + - alert: TidalDBSessionLeak + expr: rate(tidaldb_active_sessions[5m]) > 10 and tidaldb_active_sessions > 100 + for: 5m + labels: { severity: warning } + annotations: + summary: "Active session count growing rapidly" + description: "{{ $value }} active sessions and growing. Agents may not be closing sessions." + + - alert: TidalDBHighRateLimiting + expr: rate(tidaldb_rate_limited_total[5m]) > 100 + for: 5m + labels: { severity: info } + annotations: + summary: "Sustained rate limiting" + description: "{{ $value }}/s rate-limited writes. Review agent rate limit config." + + - alert: TidalDBTantivySegmentBloat + expr: tidaldb_tantivy_segment_count > 30 + for: 10m + labels: { severity: warning } + annotations: + summary: "Tantivy segment count elevated" + description: "{{ $value }} segments (threshold: 30). Text syncer may be stalled." + + - alert: TidalDBSlowRetrieve + expr: histogram_quantile(0.95, rate(tidaldb_retrieve_latency_us_bucket[5m])) > 500000 + for: 5m + labels: { severity: warning } + annotations: + summary: "Retrieve p95 latency exceeds 500ms" + description: "p95 retrieve latency is {{ $value | humanizeDuration }}. Check signal ledger load and degradation level." + + - alert: TidalDBSlowSearch + expr: histogram_quantile(0.95, rate(tidaldb_search_latency_us_bucket[5m])) > 1000000 + for: 5m + labels: { severity: warning } + annotations: + summary: "Search p95 latency exceeds 1s" + description: "p95 search latency is {{ $value | humanizeDuration }}. Check Tantivy segment count and ANN index health." diff --git a/tidal-server/Cargo.toml b/tidal-server/Cargo.toml index e80dcbe..a6171c7 100644 --- a/tidal-server/Cargo.toml +++ b/tidal-server/Cargo.toml @@ -5,9 +5,16 @@ edition.workspace = true rust-version.workspace = true license.workspace = true +[lib] +name = "tidal_server" +path = "src/lib.rs" + [dependencies] axum = "0.8" clap = { version = "4.5", features = ["derive"] } +subtle = "2" +tower = { version = "0.5", features = ["limit"] } +tower-http = { version = "0.6", features = ["timeout", "trace", "request-id"] } serde = { version = "1", features = ["derive"] } serde_json = "1" serde_yaml = "0.9" @@ -15,4 +22,4 @@ thiserror = "2" tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } -tidaldb = { path = "../tidal", features = ["test-utils"] } +tidaldb = { path = "../tidal" } diff --git a/tidal-server/src/config.rs b/tidal-server/src/config.rs index 3f9b047..933b821 100644 --- a/tidal-server/src/config.rs +++ b/tidal-server/src/config.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::fs; use std::path::Path; use std::time::Duration; @@ -9,13 +8,6 @@ use tidaldb::schema::{DecaySpec, EntityKind, Schema, SchemaBuilder, TextFieldTyp use crate::error::{Result, ServerError}; const DEFAULT_SCHEMA_YAML: &str = include_str!("../config/default-schema.yaml"); -const DEFAULT_CLUSTER_YAML: &str = include_str!("../config/default-cluster.yaml"); - -#[derive(Debug)] -pub struct ClusterLayout { - pub regions: Vec, - pub leader: String, -} pub fn load_schema(path: Option<&Path>) -> Result { let raw = read_config(path, DEFAULT_SCHEMA_YAML)?; @@ -64,40 +56,6 @@ pub fn load_schema(path: Option<&Path>) -> Result { builder.build().map_err(ServerError::SchemaBuild) } -pub fn load_cluster_layout(path: Option<&Path>) -> Result { - let raw = read_config(path, DEFAULT_CLUSTER_YAML)?; - let spec: ClusterSpec = serde_yaml::from_str(&raw) - .map_err(|e| ServerError::ClusterConfig(format!("parse cluster yaml: {e}")))?; - - if spec.regions.is_empty() { - return Err(ServerError::ClusterConfig( - "cluster config must include at least one region".into(), - )); - } - - let mut seen = HashSet::new(); - for region in &spec.regions { - if !seen.insert(region.name.to_lowercase()) { - return Err(ServerError::ClusterConfig(format!( - "duplicate region name: {}", - region.name - ))); - } - } - - if !seen.contains(&spec.leader.to_lowercase()) { - return Err(ServerError::ClusterConfig(format!( - "leader '{}' not found in region list", - spec.leader - ))); - } - - Ok(ClusterLayout { - regions: spec.regions.into_iter().map(|r| r.name).collect(), - leader: spec.leader, - }) -} - fn read_config(path: Option<&Path>, fallback: &str) -> Result { match path { Some(p) => fs::read_to_string(p).map_err(|e| ServerError::io(p, e)), @@ -180,17 +138,6 @@ struct EmbeddingSpec { dimensions: usize, } -#[derive(Debug, Deserialize)] -struct ClusterSpec { - regions: Vec, - leader: String, -} - -#[derive(Debug, Deserialize)] -struct RegionSpec { - name: String, -} - fn parse_entity_kind(input: &str) -> Result { match input.trim().to_lowercase().as_str() { "item" | "items" => Ok(EntityKind::Item), diff --git a/tidal-server/src/error.rs b/tidal-server/src/error.rs index 0c51d1a..12b7dbb 100644 --- a/tidal-server/src/error.rs +++ b/tidal-server/src/error.rs @@ -15,8 +15,6 @@ pub enum ServerError { SchemaConfig(String), #[error("schema build failed: {0}")] SchemaBuild(#[from] tidaldb::schema::SchemaError), - #[error("invalid cluster config: {0}")] - ClusterConfig(String), #[error("tidalDB error: {0}")] Tidal(#[from] tidaldb::TidalError), #[error("http server error: {0}")] diff --git a/tidal-server/src/lib.rs b/tidal-server/src/lib.rs new file mode 100644 index 0000000..e8c9281 --- /dev/null +++ b/tidal-server/src/lib.rs @@ -0,0 +1,8 @@ +/// Public modules exposed for integration testing. +/// +/// The binary (`main.rs`) imports from this lib crate rather than declaring +/// the modules directly, so integration tests in `tests/` can access them. +pub mod config; +pub mod error; +pub mod router; +pub mod state; diff --git a/tidal-server/src/main.rs b/tidal-server/src/main.rs index d31908b..627b1dc 100644 --- a/tidal-server/src/main.rs +++ b/tidal-server/src/main.rs @@ -1,20 +1,14 @@ -mod config; -mod error; -mod router; -mod state; - use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use clap::{Args, Parser, Subcommand}; +use tidal_server::config::load_schema; +use tidal_server::error::{Result, ServerError}; +use tidal_server::router::build_router; +use tidal_server::state::ServerState; use tidaldb::TidalDb; -use crate::config::{load_cluster_layout, load_schema}; -use crate::error::{Result, ServerError}; -use crate::router::build_router; -use crate::state::ServerState; - #[derive(Parser)] #[command(version, about = "HTTP wrapper for tidalDB")] struct Cli { @@ -26,8 +20,6 @@ struct Cli { enum Command { #[command(about = "Run a single-node server wrapping one tidalDB instance")] Standalone(StandaloneArgs), - #[command(about = "Run the simulated multi-region cluster server")] - Cluster(ClusterArgs), } #[derive(Args)] @@ -38,16 +30,11 @@ struct StandaloneArgs { schema: Option, #[arg(long)] data_dir: Option, -} - -#[derive(Args)] -struct ClusterArgs { - #[arg(long, default_value = "0.0.0.0:9500")] - listen: String, - #[arg(long)] - schema: Option, - #[arg(long)] - topology: Option, + #[arg( + long, + help = "Bind address for Prometheus /metrics endpoint (e.g. 127.0.0.1:9091)" + )] + metrics: Option, } #[tokio::main] @@ -64,7 +51,6 @@ async fn run() -> Result<()> { match cli.mode { Command::Standalone(args) => run_standalone(args).await, - Command::Cluster(args) => run_cluster(args).await, } } @@ -84,20 +70,39 @@ async fn run_standalone(args: StandaloneArgs) -> Result<()> { } else { builder = builder.ephemeral(); } + if let Some(ref addr) = args.metrics { + builder = builder.enable_metrics(addr); + } let db = builder.open()?; - let state = ServerState::standalone(db); - serve(state, &args.listen).await + if let Some(addr) = db.metrics_addr() { + tracing::info!("metrics endpoint listening on http://{addr}/metrics"); + } + let state = ServerState::new(db); + + let api_key = read_api_key(); + serve(state, &args.listen, api_key).await } -async fn run_cluster(args: ClusterArgs) -> Result<()> { - let schema = load_schema(args.schema.as_deref())?; - let layout = load_cluster_layout(args.topology.as_deref())?; - let state = ServerState::cluster(schema, layout)?; - serve(state, &args.listen).await +/// Read the API key from the environment. +/// +/// If `TIDAL_API_KEY` is not set, all requests are accepted without +/// authentication. This is appropriate for local development but should +/// never be used in production. A startup warning is emitted. +fn read_api_key() -> Option> { + match std::env::var("TIDAL_API_KEY") { + Ok(key) if !key.is_empty() => Some(Arc::from(key.as_str())), + _ => { + tracing::warn!( + "TIDAL_API_KEY is not set — all endpoints are unauthenticated. \ + Set this variable before exposing the server to any network." + ); + None + } + } } -async fn serve(state: ServerState, addr: &str) -> Result<()> { +async fn serve(state: ServerState, addr: &str, api_key: Option>) -> Result<()> { let socket: SocketAddr = addr .parse() .map_err(|e| ServerError::BadRequest(format!("invalid addr: {e}")))?; @@ -106,15 +111,40 @@ async fn serve(state: ServerState, addr: &str) -> Result<()> { let actual = listener.local_addr()?; tracing::info!("listening on http://{actual}"); - axum::serve(listener, build_router(Arc::new(state))) + axum::serve(listener, build_router(Arc::new(state), api_key)) .with_graceful_shutdown(shutdown_signal()) .await?; Ok(()) } async fn shutdown_signal() { - if let Err(err) = tokio::signal::ctrl_c().await { - tracing::warn!("ctrl-c handler error: {err}"); + // SIGTERM is Unix-only; on other platforms we fall back to ctrl-c alone. + #[cfg(unix)] + let sigterm = async { + use tokio::signal::unix::{SignalKind, signal}; + match signal(SignalKind::terminate()) { + Ok(mut stream) => { + stream.recv().await; + } + Err(err) => { + tracing::warn!("SIGTERM handler registration failed: {err}"); + // If we cannot register SIGTERM, park this branch so the + // select falls back to ctrl_c exclusively. + std::future::pending::<()>().await; + } + } + }; + #[cfg(not(unix))] + let sigterm = std::future::pending::<()>(); + + tokio::select! { + result = tokio::signal::ctrl_c() => { + if let Err(err) = result { + tracing::warn!("ctrl-c handler error: {err}"); + } + } + _ = sigterm => {} } + tracing::info!("shutdown signal received"); } diff --git a/tidal-server/src/router.rs b/tidal-server/src/router.rs index 20cee2b..827663a 100644 --- a/tidal-server/src/router.rs +++ b/tidal-server/src/router.rs @@ -1,38 +1,178 @@ use std::collections::HashMap; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; use axum::Json; use axum::Router; -use axum::extract::{Query, State}; +use axum::extract::{Query, Request, State}; use axum::http::StatusCode; +use axum::http::header::AUTHORIZATION; +use axum::middleware::{self, Next}; use axum::response::{IntoResponse, Response}; use axum::routing::{get, post}; use serde::{Deserialize, Serialize}; +use subtle::ConstantTimeEq; use tidaldb::query::retrieve::Retrieve; use tidaldb::query::search::Search; use tidaldb::schema::EntityId; +use tower::ServiceBuilder; +use tower::limit::ConcurrencyLimitLayer; +use tower_http::request_id::{ + MakeRequestId, PropagateRequestIdLayer, RequestId, SetRequestIdLayer, +}; +use tower_http::timeout::TimeoutLayer; +use tower_http::trace::TraceLayer; use crate::error::{Result, ServerError}; -use crate::state::{ClusterHealth, ServerState}; +use crate::state::ServerState; -pub fn build_router(state: Arc) -> Router { - let mut app = Router::new() +/// Maximum request body size. Requests exceeding this are rejected with 413 +/// before any deserialization occurs. +const BODY_LIMIT_BYTES: usize = 2 * 1024 * 1024; + +/// Maximum wall-clock time a single request may occupy. Exceeded requests +/// receive 408 Request Timeout. +const REQUEST_TIMEOUT_SECS: u64 = 30; + +/// Maximum number of requests processed concurrently. Additional requests +/// are queued by the concurrency layer until a slot opens. +const MAX_CONCURRENCY: usize = 100; + +/// Sequential request ID generator — assigns monotonically increasing IDs. +#[derive(Clone, Default)] +struct SequentialRequestId(Arc); + +impl MakeRequestId for SequentialRequestId { + fn make_request_id(&mut self, _: &Request) -> Option { + // Relaxed ordering is sufficient: we only need unique IDs, not + // cross-thread happens-before ordering. + let id = self.0.fetch_add(1, Ordering::Relaxed); + axum::http::HeaderValue::from_str(&id.to_string()) + .ok() + .map(RequestId::new) + } +} + +/// Build the application router. +/// +/// Routes are split into two groups: +/// - **Public** (`/health`) — never requires auth; safe for liveness/readiness probes. +/// - **Protected** — require `Authorization: Bearer ` when `api_key` is `Some`. +/// +/// Global middleware stack (applied to all routes, outermost → innermost): +/// 1. `SetRequestIdLayer` — assigns sequential `x-request-id` before the span is created +/// 2. `PropagateRequestIdLayer` — echoes `x-request-id` into the response +/// 3. `TraceLayer` — creates a structured span; reads the ID set in step 1 +/// +/// Protected-only middleware (not applied to `/health`): +/// 4. `TimeoutLayer` — returns 408 for requests exceeding [`REQUEST_TIMEOUT_SECS`] +/// 5. `ConcurrencyLimitLayer` — queues beyond [`MAX_CONCURRENCY`] in-flight requests +/// +/// Keeping `/health` outside the timeout/concurrency layers means health probes +/// are never queued or timed out under saturation, preventing false liveness failures. +pub fn build_router(state: Arc, api_key: Option>) -> Router { + // Public routes — exempt from auth so health probes always work. + let public = Router::new() + .route("/health", get(health)) + .with_state(Arc::clone(&state)); + + // Protected routes — gated by Bearer token when a key is configured. + let protected = Router::new() .route("/items", post(create_item)) .route("/embeddings", post(write_embedding)) .route("/signals", post(write_signal)) .route("/feed", get(feed)) .route("/search", get(search)) - .route("/health", get(health)); + .layer(axum::extract::DefaultBodyLimit::max(BODY_LIMIT_BYTES)) + .with_state(state); - if state.is_cluster() { - app = app - .route("/cluster/status", get(cluster_status)) - .route("/cluster/promote", post(promote_leader)) - .route("/cluster/partition", post(partition_region)) - .route("/cluster/heal", post(heal_region)); + let protected = match api_key { + Some(key) => protected.layer(middleware::from_fn(move |req: Request, next: Next| { + let key = key.clone(); + async move { bearer_auth(req, next, &key).await } + })), + None => protected, + }; + + // Timeout and concurrency apply only to protected routes so health probes + // are never queued or dropped during overload. + let protected = protected.layer( + ServiceBuilder::new() + .layer(TimeoutLayer::with_status_code( + StatusCode::REQUEST_TIMEOUT, + Duration::from_secs(REQUEST_TIMEOUT_SECS), + )) + .layer(ConcurrencyLimitLayer::new(MAX_CONCURRENCY)), + ); + + // SetRequestId must be outermost so the ID is in headers when TraceLayer + // creates its span. In ServiceBuilder the first .layer() is outermost. + public.merge(protected).layer( + ServiceBuilder::new() + .layer(SetRequestIdLayer::x_request_id( + SequentialRequestId::default(), + )) + .layer(PropagateRequestIdLayer::x_request_id()) + .layer( + TraceLayer::new_for_http().make_span_with(|req: &Request<_>| { + // x-request-id is guaranteed present: SetRequestIdLayer runs first. + let id = req + .headers() + .get("x-request-id") + .and_then(|v| v.to_str().ok()) + .unwrap_or("-"); + tracing::info_span!( + "request", + method = %req.method(), + uri = %req.uri(), + request_id = %id, + ) + }), + ), + ) +} + +/// Validate an `Authorization: Bearer ` header. +/// +/// Returns `401 Unauthorized` with a `WWW-Authenticate: Bearer` header if the +/// token is absent or does not match the expected key. The comparison uses +/// constant-time equality to prevent timing-based token reconstruction. +async fn bearer_auth(request: Request, next: Next, expected_key: &str) -> Response { + // RFC 7235 §2.1: auth-scheme tokens are case-insensitive. + let token = request + .headers() + .get(AUTHORIZATION) + .and_then(|v| v.to_str().ok()) + .and_then(|s| { + // "Bearer " is 7 bytes; check case-insensitively then slice the token. + if s.len() > 7 && s[..7].eq_ignore_ascii_case("bearer ") { + Some(&s[7..]) + } else { + None + } + }); + + let authorized = match token { + Some(t) => { + let a = t.as_bytes(); + let b = expected_key.as_bytes(); + // Length mismatch does not leak token content; short-circuit is safe. + a.len() == b.len() && bool::from(a.ct_eq(b)) + } + None => false, + }; + + if authorized { + next.run(request).await + } else { + ( + StatusCode::UNAUTHORIZED, + [("www-authenticate", "Bearer")], + Json(serde_json::json!({"error": "missing or invalid api key"})), + ) + .into_response() } - - app.with_state(state) } #[derive(Deserialize)] @@ -216,9 +356,7 @@ async fn search( State(state): State>, Query(query): Query, ) -> Result, AppError> { - let mut builder = Search::builder() - .query(&query.query) - .limit(query.limit); + let mut builder = Search::builder().query(&query.query).limit(query.limit); if let Some(user_id) = query.user_id { builder = builder.for_user(user_id); } @@ -262,56 +400,13 @@ async fn health( ) -> Result, AppError> { let region = query.get("region").map(|s| s.as_str()); let items = state.item_count(region).map_err(AppError)?; - let mode = if state.is_cluster() { - "cluster" - } else { - "standalone" - }; Ok(Json(HealthResponse { status: "ok", - mode, + mode: "standalone", items, })) } -async fn cluster_status( - State(state): State>, -) -> Result, AppError> { - state - .cluster_status() - .map(Json) - .ok_or_else(|| AppError(ServerError::BadRequest("not in cluster mode".into()))) -} - -#[derive(Deserialize)] -struct RegionCommand { - region: String, -} - -async fn promote_leader( - State(state): State>, - Json(cmd): Json, -) -> Result { - state.promote_leader(&cmd.region).map_err(AppError)?; - Ok(StatusCode::NO_CONTENT) -} - -async fn partition_region( - State(state): State>, - Json(cmd): Json, -) -> Result { - state.partition_region(&cmd.region).map_err(AppError)?; - Ok(StatusCode::NO_CONTENT) -} - -async fn heal_region( - State(state): State>, - Json(cmd): Json, -) -> Result { - state.heal_region(&cmd.region).map_err(AppError)?; - Ok(StatusCode::NO_CONTENT) -} - struct TidalErrorWrapper(tidaldb::TidalError); impl From for AppError { @@ -334,9 +429,7 @@ impl IntoResponse for AppError { fn status_from_error(err: &ServerError) -> StatusCode { match err { - ServerError::BadRequest(_) - | ServerError::SchemaConfig(_) - | ServerError::ClusterConfig(_) => StatusCode::BAD_REQUEST, + ServerError::BadRequest(_) | ServerError::SchemaConfig(_) => StatusCode::BAD_REQUEST, ServerError::Tidal(tidal_err) => match tidal_err { tidaldb::TidalError::NotFound { .. } => StatusCode::NOT_FOUND, tidaldb::TidalError::Schema(_) | tidaldb::TidalError::InvalidInput(_) => { diff --git a/tidal-server/src/state.rs b/tidal-server/src/state.rs index 6f55d23..b63bc66 100644 --- a/tidal-server/src/state.rs +++ b/tidal-server/src/state.rs @@ -1,111 +1,35 @@ use std::collections::HashMap; use std::sync::Arc; +use tidaldb::TidalDb; use tidaldb::query::retrieve::Retrieve; use tidaldb::query::search::Search; -use tidaldb::schema::{EntityId, Schema}; -use tidaldb::replication::RegionId; -use tidaldb::testing::cluster::SimulatedCluster; -use tidaldb::TidalDb; +use tidaldb::schema::EntityId; -use crate::config::ClusterLayout; use crate::error::{Result, ServerError}; +/// Return an error if a region name was provided. +/// +/// Region routing is a cluster-mode concept. In standalone mode any non-None +/// region param is a client mistake; return a clear error rather than silently +/// ignoring it. +fn ensure_standalone(region_name: Option<&str>) -> Result<()> { + if region_name.is_some() { + return Err(ServerError::BadRequest( + "region routing requires cluster mode; this server runs standalone".into(), + )); + } + Ok(()) +} + #[derive(Clone)] pub struct ServerState { - mode: Mode, -} - -#[derive(Clone)] -enum Mode { - Standalone(Arc), - Cluster(ClusterState), -} - -#[derive(Clone)] -struct ClusterState { - cluster: Arc, - regions: RegionDirectory, -} - -#[derive(Clone)] -struct RegionDirectory { - #[allow(dead_code)] - default_region: RegionId, - name_to_id: HashMap, - id_to_name: HashMap, -} - -#[derive(Debug, serde::Serialize)] -pub struct ClusterHealth { - pub leader: String, - pub relay_log_len: u64, - pub regions: Vec, -} - -#[derive(Debug, serde::Serialize)] -pub struct RegionStatus { - pub name: String, - pub applied_events: u64, - pub lag_events: i64, - pub partitioned: bool, + db: Arc, } impl ServerState { - pub fn standalone(db: TidalDb) -> Self { - Self { - mode: Mode::Standalone(Arc::new(db)), - } - } - - pub fn cluster(schema: Schema, layout: ClusterLayout) -> Result { - use tidaldb::testing::cluster::ClusterConfig; - - let mut regions = Vec::new(); - for (idx, name) in layout.regions.iter().enumerate() { - regions.push((RegionId(idx as u16), name.clone())); - } - - let leader_id = regions - .iter() - .find(|(_, name)| name.eq_ignore_ascii_case(&layout.leader)) - .map(|(id, _)| *id) - .ok_or_else(|| { - ServerError::ClusterConfig(format!( - "leader '{}' not found in region list", - layout.leader - )) - })?; - - let config = ClusterConfig { - regions: regions.iter().map(|(id, _)| *id).collect(), - leader_region: leader_id, - schema, - }; - let cluster = Arc::new(SimulatedCluster::build(config)); - let mut name_to_id = HashMap::new(); - let mut id_to_name = HashMap::new(); - for (id, name) in ®ions { - name_to_id.insert(name.to_lowercase(), *id); - id_to_name.insert(id.0, name.clone()); - } - - let directory = RegionDirectory { - default_region: leader_id, - name_to_id, - id_to_name, - }; - - Ok(Self { - mode: Mode::Cluster(ClusterState { - cluster, - regions: directory, - }), - }) - } - - pub fn is_cluster(&self) -> bool { - matches!(self.mode, Mode::Cluster(_)) + pub fn new(db: TidalDb) -> Self { + Self { db: Arc::new(db) } } pub fn write_item( @@ -113,24 +37,15 @@ impl ServerState { entity_id: EntityId, metadata: &HashMap, ) -> Result<()> { - match &self.mode { - Mode::Standalone(db) => db - .write_item_with_metadata(entity_id, metadata) - .map_err(ServerError::from), - Mode::Cluster(cluster) => cluster - .cluster - .write_item_with_metadata(entity_id, metadata) - .map_err(ServerError::from), - } + self.db + .write_item_with_metadata(entity_id, metadata) + .map_err(ServerError::from) } pub fn write_embedding(&self, entity_id: EntityId, embedding: &[f32]) -> Result<()> { - match &self.mode { - Mode::Standalone(db) => db - .write_item_embedding(entity_id, embedding) - .map_err(ServerError::from), - Mode::Cluster(cluster) => cluster.cluster.write_item_embedding(entity_id, embedding).map_err(ServerError::from), - } + self.db + .write_item_embedding(entity_id, embedding) + .map_err(ServerError::from) } pub fn signal( @@ -141,37 +56,26 @@ impl ServerState { user_id: Option, creator_id: Option, ) -> Result<()> { - match &self.mode { - Mode::Standalone(db) => { - if user_id.is_some() || creator_id.is_some() { - db.signal_with_context( - signal_name, - entity_id, - weight, - tidaldb::schema::Timestamp::now(), - user_id, - creator_id, - ) - .map_err(ServerError::from) - } else { - db.signal( - signal_name, - entity_id, - weight, - tidaldb::schema::Timestamp::now(), - ) - .map_err(ServerError::from) - } - } - Mode::Cluster(cluster) => { - if user_id.is_some() || creator_id.is_some() { - return Err(ServerError::BadRequest( - "cluster mode currently supports only global signals (no user_id/creator_id)".into(), - )); - } - cluster.cluster.write_signal(signal_name, entity_id, weight).map_err(ServerError::from)?; - Ok(()) - } + if user_id.is_some() || creator_id.is_some() { + self.db + .signal_with_context( + signal_name, + entity_id, + weight, + tidaldb::schema::Timestamp::now(), + user_id, + creator_id, + ) + .map_err(ServerError::from) + } else { + self.db + .signal( + signal_name, + entity_id, + weight, + tidaldb::schema::Timestamp::now(), + ) + .map_err(ServerError::from) } } @@ -180,34 +84,19 @@ impl ServerState { region_name: Option<&str>, query: &Retrieve, ) -> Result { - match &self.mode { - Mode::Standalone(db) => db.retrieve(query).map_err(ServerError::from), - Mode::Cluster(cluster) => { - let region = cluster.resolve_region(region_name)?; - cluster.cluster.retrieve(region, query).map_err(ServerError::from) - } - } + ensure_standalone(region_name)?; + self.db.retrieve(query).map_err(ServerError::from) } /// Reload the Tantivy text index reader so the next search sees recently /// committed documents. /// /// On-disk indexes auto-reload via `OnCommitWithDelay`; ephemeral indexes - /// (the default for standalone and cluster modes) use `ReloadPolicy::Manual` - /// and require an explicit reload before each search. + /// (the default for standalone mode) use `ReloadPolicy::Manual` and require + /// an explicit reload before each search. pub fn reload_text_index(&self, region_name: Option<&str>) -> Result<()> { - match &self.mode { - Mode::Standalone(db) => db.reload_text_index().map_err(ServerError::from), - Mode::Cluster(cluster) => { - let region = cluster.resolve_region(region_name)?; - cluster - .cluster - .node(region) - .db - .reload_text_index() - .map_err(ServerError::from) - } - } + ensure_standalone(region_name)?; + self.db.reload_text_index().map_err(ServerError::from) } pub fn search( @@ -215,129 +104,12 @@ impl ServerState { region_name: Option<&str>, query: &Search, ) -> Result { - match &self.mode { - Mode::Standalone(db) => db.search(query).map_err(ServerError::from), - Mode::Cluster(cluster) => { - let region = cluster.resolve_region(region_name)?; - cluster.cluster.search(region, query).map_err(ServerError::from) - } - } + ensure_standalone(region_name)?; + self.db.search(query).map_err(ServerError::from) } pub fn item_count(&self, region_name: Option<&str>) -> Result { - match &self.mode { - Mode::Standalone(db) => Ok(db.item_count()), - Mode::Cluster(cluster) => { - let region = cluster.resolve_region(region_name)?; - Ok(cluster.cluster.item_count(region)) - } - } - } - - pub fn cluster_status(&self) -> Option { - match &self.mode { - Mode::Cluster(cluster) => { - let leader_id = cluster.cluster.leader_region(); - let leader_name = cluster - .regions - .id_to_name - .get(&leader_id.0) - .cloned() - .unwrap_or_else(|| format!("region-{}", leader_id.0)); - let leader_seqno = cluster.cluster.leader_seqno(); - let statuses = cluster - .regions - .id_to_name - .iter() - .map(|(id, name)| { - let rid = RegionId(*id); - let applied = cluster.cluster.applied_count(rid); - // The leader writes directly (no receiver thread), so its - // applied_seqno via replication is always 0. Report 0 lag - // for the leader since it is the authoritative source. - let lag = if rid == leader_id { - 0 - } else { - (leader_seqno as i64 - applied as i64).max(0) - }; - RegionStatus { - name: name.clone(), - applied_events: applied, - lag_events: lag, - partitioned: cluster.cluster.is_partitioned(rid), - } - }) - .collect(); - Some(ClusterHealth { - leader: leader_name, - relay_log_len: cluster.cluster.relay_log_len(), - regions: statuses, - }) - } - _ => None, - } - } - - pub fn promote_leader(&self, region_name: &str) -> Result<()> { - match &self.mode { - Mode::Cluster(cluster) => { - let region = cluster.regions.lookup(region_name).ok_or_else(|| { - ServerError::BadRequest(format!("unknown region '{region_name}'")) - })?; - cluster.cluster.promote_leader(region); - Ok(()) - } - _ => Err(ServerError::BadRequest( - "leader promotion only supported in cluster mode".into(), - )), - } - } - - pub fn partition_region(&self, region_name: &str) -> Result<()> { - match &self.mode { - Mode::Cluster(cluster) => { - let region = cluster.regions.lookup(region_name).ok_or_else(|| { - ServerError::BadRequest(format!("unknown region '{region_name}'")) - })?; - cluster.cluster.partition_region(region); - Ok(()) - } - _ => Err(ServerError::BadRequest( - "partitions only supported in cluster mode".into(), - )), - } - } - - pub fn heal_region(&self, region_name: &str) -> Result<()> { - match &self.mode { - Mode::Cluster(cluster) => { - let region = cluster.regions.lookup(region_name).ok_or_else(|| { - ServerError::BadRequest(format!("unknown region '{region_name}'")) - })?; - cluster.cluster.heal_region(region); - Ok(()) - } - _ => Err(ServerError::BadRequest( - "partitions only supported in cluster mode".into(), - )), - } - } -} - -impl ClusterState { - fn resolve_region(&self, name: Option<&str>) -> Result { - if let Some(name) = name { - self.regions - .lookup(name) - .ok_or_else(|| ServerError::BadRequest(format!("unknown region '{name}'"))) - } else { - Ok(self.cluster.leader_region()) - } - } -} - -impl RegionDirectory { - fn lookup(&self, name: &str) -> Option { - self.name_to_id.get(&name.trim().to_lowercase()).copied() + ensure_standalone(region_name)?; + Ok(self.db.item_count()) } } diff --git a/tidal-server/tests/middleware.rs b/tidal-server/tests/middleware.rs new file mode 100644 index 0000000..f198917 --- /dev/null +++ b/tidal-server/tests/middleware.rs @@ -0,0 +1,209 @@ +//! Integration tests for the tidal-server middleware stack. +//! +//! Each test spins up an in-process router using `tower::ServiceExt::oneshot` +//! — no TCP port binding required — and verifies middleware behaviors: +//! body limits, authentication, and request-ID propagation. + +use std::sync::Arc; + +use axum::body::Body; +use axum::http::{Method, Request, StatusCode}; +use tidal_server::{router::build_router, state::ServerState}; +use tidaldb::TidalDb; +use tower::ServiceExt; + +// ── Test helpers ───────────────────────────────────────────────────────────── + +fn make_state() -> Arc { + // Use the default schema so ranking profiles are available — same setup as + // `run_standalone` in main.rs. + let schema = tidal_server::config::load_schema(None).unwrap(); + let db = TidalDb::builder() + .ephemeral() + .with_schema(schema) + .open() + .unwrap(); + Arc::new(ServerState::new(db)) +} + +fn make_app(api_key: Option<&str>) -> axum::Router { + let key = api_key.map(|k| Arc::from(k)); + build_router(make_state(), key) +} + +// ── Auth tests ──────────────────────────────────────────────────────────────── + +/// /health is public — no Bearer token required. +#[tokio::test] +async fn health_no_auth_required() { + let app = make_app(Some("secret")); + let response = app + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/health") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); +} + +/// Protected routes (e.g. /feed) require a Bearer token when a key is set. +#[tokio::test] +async fn protected_route_requires_auth() { + let app = make_app(Some("secret")); + let response = app + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/feed") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); +} + +/// A valid Bearer token grants access to protected routes. +#[tokio::test] +async fn protected_route_accepts_valid_key() { + let app = make_app(Some("secret")); + let response = app + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/feed") + .header("Authorization", "Bearer secret") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); +} + +/// RFC 7235: auth scheme is case-insensitive — `bearer` should work like `Bearer`. +#[tokio::test] +async fn lowercase_bearer_accepted() { + let app = make_app(Some("secret")); + let response = app + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/feed") + .header("Authorization", "bearer secret") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); +} + +/// An incorrect Bearer token is rejected with 401. +#[tokio::test] +async fn wrong_key_rejected() { + let app = make_app(Some("secret")); + let response = app + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/feed") + .header("Authorization", "Bearer wrong") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); +} + +/// When no API key is configured, all routes are accessible without auth. +#[tokio::test] +async fn no_auth_mode_allows_all_routes() { + let app = make_app(None); + let response = app + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/feed") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); +} + +// ── Middleware behavior tests ───────────────────────────────────────────────── + +/// Bodies exceeding 2MB on protected routes are rejected with 413. +#[tokio::test] +async fn body_too_large_returns_413() { + let app = make_app(Some("secret")); + // 3 MB exceeds the 2 MB BODY_LIMIT_BYTES constant in router.rs. + let big_body = vec![0u8; 3 * 1024 * 1024]; + let response = app + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/embeddings") + .header("Authorization", "Bearer secret") + .header("Content-Type", "application/json") + .body(Body::from(big_body)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE); +} + +/// x-request-id is present in every response and increments per request. +#[tokio::test] +async fn x_request_id_increments() { + // Both calls share the same App (and therefore the same AtomicU64 counter). + let app = make_app(None); + let r1 = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/health") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + let r2 = app + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/health") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let id1: u64 = r1 + .headers() + .get("x-request-id") + .expect("x-request-id missing from first response") + .to_str() + .unwrap() + .parse() + .unwrap(); + let id2: u64 = r2 + .headers() + .get("x-request-id") + .expect("x-request-id missing from second response") + .to_str() + .unwrap() + .parse() + .unwrap(); + + assert!(id2 > id1, "x-request-id should increase: {id1} then {id2}"); +} diff --git a/tidal/src/db/backup.rs b/tidal/src/db/backup.rs index 070dae9..1e0a7f3 100644 --- a/tidal/src/db/backup.rs +++ b/tidal/src/db/backup.rs @@ -189,6 +189,10 @@ impl super::TidalDb { })?; // 4. Checkpoint signal ledger synchronously. + // + // This is fatal: if the checkpoint fails, WAL replay on restore would + // double-count signals (replay re-applies events that were never + // persisted to the checkpoint keyspace). let seq = self.last_wal_seq.load(Ordering::Acquire); if let (Some(ledger), Some(storage)) = (&self.ledger, &self.storage) { let meta = crate::signals::checkpoint::CheckpointMeta { @@ -196,18 +200,22 @@ impl super::TidalDb { wal_sequence: seq, payload_hash: [0u8; 32], }; - if let Err(e) = ledger.checkpoint(storage.items_engine(), meta) { - tracing::warn!(error = %e, "backup: signal checkpoint failed (non-fatal)"); - } + ledger + .checkpoint(storage.items_engine(), meta) + .map_err(|e| { + TidalError::Durability(DurabilityError { + message: format!("backup: signal checkpoint failed: {e}"), + }) + })?; - // Checkpoint cohort signal state if any entries exist. + // Cohort and co-engagement checkpoints are secondary state; log + // and continue rather than aborting the backup if they fail. if self.cohort_ledger.entry_count() > 0 && let Err(e) = self.cohort_ledger.checkpoint(storage.items_engine(), meta) { tracing::warn!(error = %e, "backup: cohort checkpoint failed (non-fatal)"); } - // Checkpoint co-engagement edges if any exist. if self.co_engagement.edge_count() > 0 && let Err(e) = self.co_engagement.checkpoint(storage.items_engine()) { @@ -215,7 +223,8 @@ impl super::TidalDb { } } - // 5. Flush text indexes. + // 5. Flush text indexes (non-fatal: search indexes are rebuilt on + // demand; their absence does not affect data integrity). if let Err(e) = self.flush_text_index() { tracing::warn!(error = %e, "backup: text index flush failed (non-fatal)"); } @@ -224,10 +233,16 @@ impl super::TidalDb { } // 6. Flush fjall storage (all keyspaces). - if let Some(storage) = &self.storage - && let Err(e) = storage.flush() - { - tracing::warn!(error = %e, "backup: storage flush failed (non-fatal)"); + // + // This is fatal: if the memtable cannot be flushed, the backup + // directory will be missing recently written data. Callers can + // retry once disk pressure subsides. + if let Some(storage) = &self.storage { + storage.flush().map_err(|e| { + TidalError::Durability(DurabilityError { + message: format!("backup: storage flush failed: {e}"), + }) + })?; } // 7. Copy data directory to destination. diff --git a/tidal/src/db/metrics/histogram.rs b/tidal/src/db/metrics/histogram.rs index 952b2c5..719c7ee 100644 --- a/tidal/src/db/metrics/histogram.rs +++ b/tidal/src/db/metrics/histogram.rs @@ -10,6 +10,15 @@ use std::sync::atomic::{AtomicU64, Ordering}; #[cfg(feature = "metrics")] pub const WRITE_LATENCY_BOUNDS: &[u64] = &[1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000]; +/// Bucket boundaries for query latency (retrieve + search), in microseconds. +/// +/// Range: 100µs – 10s. Retrieve and search are 10x–100x slower than signal +/// writes so the buckets are wider and cover cold-start and degraded conditions. +#[cfg(feature = "metrics")] +pub const QUERY_LATENCY_BOUNDS: &[u64] = &[ + 100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000, 5_000_000, 10_000_000, +]; + /// A Prometheus-compatible cumulative histogram. /// /// Each bucket count represents the number of observations less than or equal diff --git a/tidal/src/db/metrics/mod.rs b/tidal/src/db/metrics/mod.rs index bc32898..00ef22c 100644 --- a/tidal/src/db/metrics/mod.rs +++ b/tidal/src/db/metrics/mod.rs @@ -11,7 +11,9 @@ pub(crate) mod histogram; #[cfg(feature = "metrics")] -pub(crate) use histogram::{LatencyHistogram, WRITE_LATENCY_BOUNDS, write_metric_line}; +pub(crate) use histogram::{ + LatencyHistogram, QUERY_LATENCY_BOUNDS, WRITE_LATENCY_BOUNDS, write_metric_line, +}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::time::Instant; @@ -64,6 +66,14 @@ pub struct MetricsState { #[cfg(feature = "metrics")] pub(crate) signal_write_latency: LatencyHistogram, + /// Retrieve query end-to-end latency histogram (microseconds). + #[cfg(feature = "metrics")] + pub(crate) retrieve_latency: LatencyHistogram, + + /// Search query end-to-end latency histogram (microseconds). + #[cfg(feature = "metrics")] + pub(crate) search_latency: LatencyHistogram, + // ── Session + cohort + degradation metrics (m7p4, task-04) ───────── /// Number of currently active sessions. #[cfg(feature = "metrics")] @@ -129,6 +139,10 @@ impl MetricsState { #[cfg(feature = "metrics")] signal_write_latency: LatencyHistogram::new(WRITE_LATENCY_BOUNDS), #[cfg(feature = "metrics")] + retrieve_latency: LatencyHistogram::new(QUERY_LATENCY_BOUNDS), + #[cfg(feature = "metrics")] + search_latency: LatencyHistogram::new(QUERY_LATENCY_BOUNDS), + #[cfg(feature = "metrics")] active_sessions: AtomicU64::new(0), #[cfg(feature = "metrics")] closed_sessions_total: AtomicU64::new(0), @@ -262,6 +276,16 @@ impl MetricsState { "Signal write latency in microseconds", )); + out.push_str(&self.retrieve_latency.render_prometheus( + "tidaldb_retrieve_latency_us", + "Retrieve query end-to-end latency in microseconds", + )); + + out.push_str(&self.search_latency.render_prometheus( + "tidaldb_search_latency_us", + "Search query end-to-end latency in microseconds", + )); + // Index health metrics. write_metric_line( &mut out, diff --git a/tidal/src/db/query_ops.rs b/tidal/src/db/query_ops.rs index 3d4065d..f0d09d8 100644 --- a/tidal/src/db/query_ops.rs +++ b/tidal/src/db/query_ops.rs @@ -38,6 +38,10 @@ impl TidalDb { } } + // Record query latency from this point forward. + #[cfg(feature = "metrics")] + let query_start = std::time::Instant::now(); + // Enter the load detector. The guard decrements the in-flight counter // on drop (method return, success or error). let (degradation_level, _inflight_guard) = self.load_detector.enter(); @@ -118,7 +122,13 @@ impl TidalDb { base_executor }; - executor.execute(query).map_err(TidalError::from) + let result = executor.execute(query).map_err(TidalError::from); + #[cfg(feature = "metrics")] + { + let elapsed_us = query_start.elapsed().as_micros() as u64; + self.metrics.retrieve_latency.observe(elapsed_us); + } + result } /// Execute a SEARCH query -- text and/or vector retrieval with RRF fusion, @@ -141,6 +151,10 @@ impl TidalDb { #[allow(clippy::too_many_lines)] #[tracing::instrument(skip_all, fields(query = ?query.query_text, kind = ?query.entity_kind))] pub fn search(&self, query: &Search) -> crate::Result { + // Record query latency from this point forward. + #[cfg(feature = "metrics")] + let query_start = std::time::Instant::now(); + // Guard against oversized filter expressions. const MAX_FILTER_NODES: usize = 256; if let Some(ref filter) = query.combined_filter() { @@ -260,7 +274,15 @@ impl TidalDb { base_executor }; - let result = executor.execute(query).map_err(TidalError::from)?; + let result = executor.execute(query).map_err(TidalError::from); + + #[cfg(feature = "metrics")] + { + let elapsed_us = query_start.elapsed().as_micros() as u64; + self.metrics.search_latency.observe(elapsed_us); + } + + let result = result?; // M6p5: record query text for trending autocomplete suggestions. if let Some(ref text) = query.query_text {