stemedb/latent/divergence-engine/main.py
jordan b3e8a9a058 feat: Multi-application expansion with chaos testing and community UI
Major additions:
- Community Next.js app (port 18187) for browsing claims with API docs
- stemedb-chaos crate: Fault injection, chaos testing, CRDT properties
- Latent ingestion system: Reddit/FDA ingesters with ADK-Go agents
- Disputed claims handling: Manual review workflows and validation
- Aphoria security scanner: New extractors (SQL injection, command
  injection, weak crypto, TLS version), policy-based ignores, UAT reports
- Docker infrastructure: Dockerfile, docker-compose.yml for full stack
- VulnBank demo: Intentionally vulnerable multi-language test corpus

SDK & API enhancements:
- Source registry handlers for tracking data provenance
- Metrics endpoint
- Skeptic filtering improvements

Code quality:
- Split 14 large files (>500 lines) into focused modules
- All files now under 500-line limit per project guidelines

Documentation:
- Chaos testing guide, circuit breakers, observability docs
- Phase 7 UAT documentation updates
- Martin Kleppmann technical writer agent

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 01:24:14 -07:00

154 lines
5.8 KiB
Python

import json
import pandas as pd
import numpy as np
from collections import Counter
from datetime import datetime
# Configuration
TIER0_FILE = "../ingest-fda/tier0_regulatory_graph.jsonl"
TIER5_FILE = "../ingest-reddit/tier5_social_graph.jsonl"
OUTPUT_FILE = "divergence_report.json"
# Thresholds
MIN_CLUSTER_SIZE = 3 # Minimum Reddit posts to consider a "Signal"
DIVERGENCE_THRESHOLD = 0.5 # Score above this = Alert
class DivergenceEngine:
def __init__(self):
self.tier0_data = []
self.tier5_data = []
self.kb_tier0 = {} # Knowledge Base for Regulatory
def load_data(self):
print("[*] Loading Graph Data...")
# Load Tier 0 (Regulatory)
try:
with open(TIER0_FILE, 'r') as f:
for line in f:
self.tier0_data.append(json.loads(line))
print(f" - Loaded {len(self.tier0_data)} Regulatory assertions")
except FileNotFoundError:
print(f"[!] Tier 0 file missing. Run ingest-fda first.")
# Load Tier 5 (Social)
try:
with open(TIER5_FILE, 'r') as f:
for line in f:
self.tier5_data.append(json.loads(line))
print(f" - Loaded {len(self.tier5_data)} Social assertions")
except FileNotFoundError:
print(f"[!] Tier 5 file missing. Run ingest-reddit first.")
def build_regulatory_kb(self):
"""
Organizes Tier 0 data into a queryable structure.
KB Structure: { 'molecule': { 'side_effect_keyword': True } }
"""
print("[*] Building Regulatory Knowledge Base...")
for assertion in self.tier0_data:
molecule = assertion['subject'].lower()
text = assertion['object'].lower()
if molecule not in self.kb_tier0:
self.kb_tier0[molecule] = set()
# Naive NLP: Bag of words for now (Production would use Embeddings)
# We map specific FDA phrases to common terms
if "nausea" in text: self.kb_tier0[molecule].add("nausea")
if "vomit" in text: self.kb_tier0[molecule].add("vomiting")
if "gastroparesis" in text: self.kb_tier0[molecule].add("gastroparesis")
if "paralysis" in text: self.kb_tier0[molecule].add("stomach paralysis")
# ... expanded vocabulary ...
def detect_signals(self):
"""
Clusters Tier 5 data to find "Latent Signals".
"""
print("[*] detecting Latent Signals in Social Data...")
signals = {} # { (molecule, symptom): [list_of_posts] }
for assertion in self.tier5_data:
mol = assertion['subject'].lower()
sym = assertion['object'].lower()
key = (mol, sym)
if key not in signals:
signals[key] = []
signals[key].append(assertion)
# Filter noise (Keep only clusters > MIN_CLUSTER_SIZE)
valid_signals = {k: v for k, v in signals.items() if len(v) >= MIN_CLUSTER_SIZE}
print(f" - Found {len(valid_signals)} valid clusters (>{MIN_CLUSTER_SIZE} reports)")
return valid_signals
def compute_divergence(self, signals):
"""
THE SKEPTIC LENS:
Compares Social Signals against Regulatory KB.
"""
print("[*] Running Divergence Analysis (The Skeptic Lens)...")
results = []
for (molecule, symptom), cluster in signals.items():
# 1. Check if known in Regulatory (Tier 0)
known_risks = self.kb_tier0.get(molecule, set())
is_known = False
# Fuzzy match check
for risk in known_risks:
if symptom in risk or risk in symptom:
is_known = True
break
# 2. Calculate Scores
volume_score = min(len(cluster) / 20.0, 1.0) # Cap at 20 posts
severity_score = 0.8 if any(a['source_metadata'].get('severity') == 'high' for a in cluster) else 0.4
# THE FORMULA:
# If known in Tier 0 -> Divergence is Low (it's just a known side effect)
# If unknown in Tier 0 -> Divergence is High (The FDA doesn't know/say this)
if is_known:
divergence_score = 0.1 * volume_score # "Consensus"
status = "KNOWN_RISK"
else:
divergence_score = 0.9 * volume_score * severity_score # "Conflict"
status = "LATENT_SIGNAL"
report = {
"molecule": molecule,
"signal": symptom,
"volume": len(cluster),
"divergence_score": round(divergence_score, 2),
"status": status,
"regulatory_status": "Listed" if is_known else "Silent/Absent",
"drivers": [a['source_metadata']['reddit_id'] for a in cluster[:3]] # Examples
}
results.append(report)
return sorted(results, key=lambda x: x['divergence_score'], reverse=True)
def run(self):
self.load_data()
if not self.tier0_data or not self.tier5_data:
print("[!] Insufficient data to run engine.")
return
self.build_regulatory_kb()
signals = self.detect_signals()
report = self.compute_divergence(signals)
# Output
with open(OUTPUT_FILE, 'w') as f:
json.dump(report, f, indent=2)
print(f"\n[OK] Analysis Complete. Found {len(report)} signals.")
print("--- TOP 3 DIVERGENCES ---")
for r in report[:3]:
print(f" > {r['molecule'].upper()} :: {r['signal']} (Score: {r['divergence_score']}) [{r['status']}]")
if __name__ == "__main__":
engine = DivergenceEngine()
engine.run()