import json import pandas as pd import numpy as np from collections import Counter from datetime import datetime # Configuration TIER0_FILE = "../ingest-fda/tier0_regulatory_graph.jsonl" TIER5_FILE = "../ingest-reddit/tier5_social_graph.jsonl" OUTPUT_FILE = "divergence_report.json" # Thresholds MIN_CLUSTER_SIZE = 3 # Minimum Reddit posts to consider a "Signal" DIVERGENCE_THRESHOLD = 0.5 # Score above this = Alert class DivergenceEngine: def __init__(self): self.tier0_data = [] self.tier5_data = [] self.kb_tier0 = {} # Knowledge Base for Regulatory def load_data(self): print("[*] Loading Graph Data...") # Load Tier 0 (Regulatory) try: with open(TIER0_FILE, 'r') as f: for line in f: self.tier0_data.append(json.loads(line)) print(f" - Loaded {len(self.tier0_data)} Regulatory assertions") except FileNotFoundError: print(f"[!] Tier 0 file missing. Run ingest-fda first.") # Load Tier 5 (Social) try: with open(TIER5_FILE, 'r') as f: for line in f: self.tier5_data.append(json.loads(line)) print(f" - Loaded {len(self.tier5_data)} Social assertions") except FileNotFoundError: print(f"[!] Tier 5 file missing. Run ingest-reddit first.") def build_regulatory_kb(self): """ Organizes Tier 0 data into a queryable structure. KB Structure: { 'molecule': { 'side_effect_keyword': True } } """ print("[*] Building Regulatory Knowledge Base...") for assertion in self.tier0_data: molecule = assertion['subject'].lower() text = assertion['object'].lower() if molecule not in self.kb_tier0: self.kb_tier0[molecule] = set() # Naive NLP: Bag of words for now (Production would use Embeddings) # We map specific FDA phrases to common terms if "nausea" in text: self.kb_tier0[molecule].add("nausea") if "vomit" in text: self.kb_tier0[molecule].add("vomiting") if "gastroparesis" in text: self.kb_tier0[molecule].add("gastroparesis") if "paralysis" in text: self.kb_tier0[molecule].add("stomach paralysis") # ... expanded vocabulary ... def detect_signals(self): """ Clusters Tier 5 data to find "Latent Signals". """ print("[*] detecting Latent Signals in Social Data...") signals = {} # { (molecule, symptom): [list_of_posts] } for assertion in self.tier5_data: mol = assertion['subject'].lower() sym = assertion['object'].lower() key = (mol, sym) if key not in signals: signals[key] = [] signals[key].append(assertion) # Filter noise (Keep only clusters > MIN_CLUSTER_SIZE) valid_signals = {k: v for k, v in signals.items() if len(v) >= MIN_CLUSTER_SIZE} print(f" - Found {len(valid_signals)} valid clusters (>{MIN_CLUSTER_SIZE} reports)") return valid_signals def compute_divergence(self, signals): """ THE SKEPTIC LENS: Compares Social Signals against Regulatory KB. """ print("[*] Running Divergence Analysis (The Skeptic Lens)...") results = [] for (molecule, symptom), cluster in signals.items(): # 1. Check if known in Regulatory (Tier 0) known_risks = self.kb_tier0.get(molecule, set()) is_known = False # Fuzzy match check for risk in known_risks: if symptom in risk or risk in symptom: is_known = True break # 2. Calculate Scores volume_score = min(len(cluster) / 20.0, 1.0) # Cap at 20 posts severity_score = 0.8 if any(a['source_metadata'].get('severity') == 'high' for a in cluster) else 0.4 # THE FORMULA: # If known in Tier 0 -> Divergence is Low (it's just a known side effect) # If unknown in Tier 0 -> Divergence is High (The FDA doesn't know/say this) if is_known: divergence_score = 0.1 * volume_score # "Consensus" status = "KNOWN_RISK" else: divergence_score = 0.9 * volume_score * severity_score # "Conflict" status = "LATENT_SIGNAL" report = { "molecule": molecule, "signal": symptom, "volume": len(cluster), "divergence_score": round(divergence_score, 2), "status": status, "regulatory_status": "Listed" if is_known else "Silent/Absent", "drivers": [a['source_metadata']['reddit_id'] for a in cluster[:3]] # Examples } results.append(report) return sorted(results, key=lambda x: x['divergence_score'], reverse=True) def run(self): self.load_data() if not self.tier0_data or not self.tier5_data: print("[!] Insufficient data to run engine.") return self.build_regulatory_kb() signals = self.detect_signals() report = self.compute_divergence(signals) # Output with open(OUTPUT_FILE, 'w') as f: json.dump(report, f, indent=2) print(f"\n[OK] Analysis Complete. Found {len(report)} signals.") print("--- TOP 3 DIVERGENCES ---") for r in report[:3]: print(f" > {r['molecule'].upper()} :: {r['signal']} (Score: {r['divergence_score']}) [{r['status']}]") if __name__ == "__main__": engine = DivergenceEngine() engine.run()