Major additions: - Community Next.js app (port 18187) for browsing claims with API docs - stemedb-chaos crate: Fault injection, chaos testing, CRDT properties - Latent ingestion system: Reddit/FDA ingesters with ADK-Go agents - Disputed claims handling: Manual review workflows and validation - Aphoria security scanner: New extractors (SQL injection, command injection, weak crypto, TLS version), policy-based ignores, UAT reports - Docker infrastructure: Dockerfile, docker-compose.yml for full stack - VulnBank demo: Intentionally vulnerable multi-language test corpus SDK & API enhancements: - Source registry handlers for tracking data provenance - Metrics endpoint - Skeptic filtering improvements Code quality: - Split 14 large files (>500 lines) into focused modules - All files now under 500-line limit per project guidelines Documentation: - Chaos testing guide, circuit breakers, observability docs - Phase 7 UAT documentation updates - Martin Kleppmann technical writer agent Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
154 lines
5.8 KiB
Python
154 lines
5.8 KiB
Python
import json
|
|
import pandas as pd
|
|
import numpy as np
|
|
from collections import Counter
|
|
from datetime import datetime
|
|
|
|
# Configuration
|
|
TIER0_FILE = "../ingest-fda/tier0_regulatory_graph.jsonl"
|
|
TIER5_FILE = "../ingest-reddit/tier5_social_graph.jsonl"
|
|
OUTPUT_FILE = "divergence_report.json"
|
|
|
|
# Thresholds
|
|
MIN_CLUSTER_SIZE = 3 # Minimum Reddit posts to consider a "Signal"
|
|
DIVERGENCE_THRESHOLD = 0.5 # Score above this = Alert
|
|
|
|
class DivergenceEngine:
|
|
def __init__(self):
|
|
self.tier0_data = []
|
|
self.tier5_data = []
|
|
self.kb_tier0 = {} # Knowledge Base for Regulatory
|
|
|
|
def load_data(self):
|
|
print("[*] Loading Graph Data...")
|
|
|
|
# Load Tier 0 (Regulatory)
|
|
try:
|
|
with open(TIER0_FILE, 'r') as f:
|
|
for line in f:
|
|
self.tier0_data.append(json.loads(line))
|
|
print(f" - Loaded {len(self.tier0_data)} Regulatory assertions")
|
|
except FileNotFoundError:
|
|
print(f"[!] Tier 0 file missing. Run ingest-fda first.")
|
|
|
|
# Load Tier 5 (Social)
|
|
try:
|
|
with open(TIER5_FILE, 'r') as f:
|
|
for line in f:
|
|
self.tier5_data.append(json.loads(line))
|
|
print(f" - Loaded {len(self.tier5_data)} Social assertions")
|
|
except FileNotFoundError:
|
|
print(f"[!] Tier 5 file missing. Run ingest-reddit first.")
|
|
|
|
def build_regulatory_kb(self):
|
|
"""
|
|
Organizes Tier 0 data into a queryable structure.
|
|
KB Structure: { 'molecule': { 'side_effect_keyword': True } }
|
|
"""
|
|
print("[*] Building Regulatory Knowledge Base...")
|
|
for assertion in self.tier0_data:
|
|
molecule = assertion['subject'].lower()
|
|
text = assertion['object'].lower()
|
|
|
|
if molecule not in self.kb_tier0:
|
|
self.kb_tier0[molecule] = set()
|
|
|
|
# Naive NLP: Bag of words for now (Production would use Embeddings)
|
|
# We map specific FDA phrases to common terms
|
|
if "nausea" in text: self.kb_tier0[molecule].add("nausea")
|
|
if "vomit" in text: self.kb_tier0[molecule].add("vomiting")
|
|
if "gastroparesis" in text: self.kb_tier0[molecule].add("gastroparesis")
|
|
if "paralysis" in text: self.kb_tier0[molecule].add("stomach paralysis")
|
|
# ... expanded vocabulary ...
|
|
|
|
def detect_signals(self):
|
|
"""
|
|
Clusters Tier 5 data to find "Latent Signals".
|
|
"""
|
|
print("[*] detecting Latent Signals in Social Data...")
|
|
signals = {} # { (molecule, symptom): [list_of_posts] }
|
|
|
|
for assertion in self.tier5_data:
|
|
mol = assertion['subject'].lower()
|
|
sym = assertion['object'].lower()
|
|
|
|
key = (mol, sym)
|
|
if key not in signals:
|
|
signals[key] = []
|
|
signals[key].append(assertion)
|
|
|
|
# Filter noise (Keep only clusters > MIN_CLUSTER_SIZE)
|
|
valid_signals = {k: v for k, v in signals.items() if len(v) >= MIN_CLUSTER_SIZE}
|
|
print(f" - Found {len(valid_signals)} valid clusters (>{MIN_CLUSTER_SIZE} reports)")
|
|
return valid_signals
|
|
|
|
def compute_divergence(self, signals):
|
|
"""
|
|
THE SKEPTIC LENS:
|
|
Compares Social Signals against Regulatory KB.
|
|
"""
|
|
print("[*] Running Divergence Analysis (The Skeptic Lens)...")
|
|
results = []
|
|
|
|
for (molecule, symptom), cluster in signals.items():
|
|
# 1. Check if known in Regulatory (Tier 0)
|
|
known_risks = self.kb_tier0.get(molecule, set())
|
|
|
|
is_known = False
|
|
# Fuzzy match check
|
|
for risk in known_risks:
|
|
if symptom in risk or risk in symptom:
|
|
is_known = True
|
|
break
|
|
|
|
# 2. Calculate Scores
|
|
volume_score = min(len(cluster) / 20.0, 1.0) # Cap at 20 posts
|
|
severity_score = 0.8 if any(a['source_metadata'].get('severity') == 'high' for a in cluster) else 0.4
|
|
|
|
# THE FORMULA:
|
|
# If known in Tier 0 -> Divergence is Low (it's just a known side effect)
|
|
# If unknown in Tier 0 -> Divergence is High (The FDA doesn't know/say this)
|
|
|
|
if is_known:
|
|
divergence_score = 0.1 * volume_score # "Consensus"
|
|
status = "KNOWN_RISK"
|
|
else:
|
|
divergence_score = 0.9 * volume_score * severity_score # "Conflict"
|
|
status = "LATENT_SIGNAL"
|
|
|
|
report = {
|
|
"molecule": molecule,
|
|
"signal": symptom,
|
|
"volume": len(cluster),
|
|
"divergence_score": round(divergence_score, 2),
|
|
"status": status,
|
|
"regulatory_status": "Listed" if is_known else "Silent/Absent",
|
|
"drivers": [a['source_metadata']['reddit_id'] for a in cluster[:3]] # Examples
|
|
}
|
|
results.append(report)
|
|
|
|
return sorted(results, key=lambda x: x['divergence_score'], reverse=True)
|
|
|
|
def run(self):
|
|
self.load_data()
|
|
if not self.tier0_data or not self.tier5_data:
|
|
print("[!] Insufficient data to run engine.")
|
|
return
|
|
|
|
self.build_regulatory_kb()
|
|
signals = self.detect_signals()
|
|
report = self.compute_divergence(signals)
|
|
|
|
# Output
|
|
with open(OUTPUT_FILE, 'w') as f:
|
|
json.dump(report, f, indent=2)
|
|
|
|
print(f"\n[OK] Analysis Complete. Found {len(report)} signals.")
|
|
print("--- TOP 3 DIVERGENCES ---")
|
|
for r in report[:3]:
|
|
print(f" > {r['molecule'].upper()} :: {r['signal']} (Score: {r['divergence_score']}) [{r['status']}]")
|
|
|
|
if __name__ == "__main__":
|
|
engine = DivergenceEngine()
|
|
engine.run()
|