stemedb/latent/ingest-reddit/main.py
jordan b3e8a9a058 feat: Multi-application expansion with chaos testing and community UI
Major additions:
- Community Next.js app (port 18187) for browsing claims with API docs
- stemedb-chaos crate: Fault injection, chaos testing, CRDT properties
- Latent ingestion system: Reddit/FDA ingesters with ADK-Go agents
- Disputed claims handling: Manual review workflows and validation
- Aphoria security scanner: New extractors (SQL injection, command
  injection, weak crypto, TLS version), policy-based ignores, UAT reports
- Docker infrastructure: Dockerfile, docker-compose.yml for full stack
- VulnBank demo: Intentionally vulnerable multi-language test corpus

SDK & API enhancements:
- Source registry handlers for tracking data provenance
- Metrics endpoint
- Skeptic filtering improvements

Code quality:
- Split 14 large files (>500 lines) into focused modules
- All files now under 500-line limit per project guidelines

Documentation:
- Chaos testing guide, circuit breakers, observability docs
- Phase 7 UAT documentation updates
- Martin Kleppmann technical writer agent

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 01:24:14 -07:00

251 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""
LATENT: Tier 5 (Social) Ingestor - Reddit
No API credentials needed - uses public .json endpoints
"""
import os
import json
import time
import uuid
import requests
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
# Configuration
TARGET_SUBREDDITS = ["Ozempic", "Mounjaro", "Semaglutide", "Wegovy"]
KEYWORDS = [
"stomach", "paralysis", "gastroparesis",
"vomit", "nausea", "er", "emergency",
"hospital", "pain", "stopped working",
"hair loss", "side effect"
]
# StemeDB Source Class for Social/Anecdotal
SOURCE_CLASS_SOCIAL = 5
# Headers to avoid being blocked
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json",
}
def fetch_posts_json(subreddit_name: str, limit: int = 50) -> list:
"""
Fetch posts from a subreddit using public .json endpoint.
No API credentials required.
"""
print(f"[*] Scanning r/{subreddit_name}...")
posts = []
url = f"https://www.reddit.com/r/{subreddit_name}/new.json?limit={limit}"
try:
response = requests.get(url, headers=HEADERS, timeout=10)
response.raise_for_status()
data = response.json()
for child in data.get("data", {}).get("children", []):
post_data = child.get("data", {})
content = f"{post_data.get('title', '')} {post_data.get('selftext', '')}".lower()
# Keyword filter
if any(k in content for k in KEYWORDS):
posts.append({
"id": post_data.get("id"),
"title": post_data.get("title", ""),
"text": post_data.get("selftext", ""),
"url": f"https://reddit.com{post_data.get('permalink', '')}",
"created_utc": post_data.get("created_utc", 0),
"score": post_data.get("score", 0),
"author": post_data.get("author", "deleted"),
"subreddit": subreddit_name
})
except requests.exceptions.RequestException as e:
print(f"[!] Error fetching r/{subreddit_name}: {e}")
return posts
def extract_assertion_mock(post: dict) -> list:
"""
Heuristic extraction when OpenAI key is missing.
"""
assertions = []
text = f"{post['title']} {post['text']}".lower()
# Map subreddit to likely drug
drug_map = {
"ozempic": "semaglutide",
"wegovy": "semaglutide",
"mounjaro": "tirzepatide",
"semaglutide": "semaglutide"
}
subject = drug_map.get(post["subreddit"].lower(), "glp1_agonist")
# Extract side effects based on keywords
if "paralysis" in text or "gastroparesis" in text:
assertions.append({
"subject": subject,
"predicate": "side_effect",
"object": "gastroparesis",
"severity": "high"
})
if "vomit" in text or "throw up" in text or "throwing up" in text:
assertions.append({
"subject": subject,
"predicate": "side_effect",
"object": "vomiting",
"severity": "medium"
})
if "nausea" in text:
assertions.append({
"subject": subject,
"predicate": "side_effect",
"object": "nausea",
"severity": "low"
})
if "hair loss" in text or "losing hair" in text:
assertions.append({
"subject": subject,
"predicate": "side_effect",
"object": "hair_loss",
"severity": "medium"
})
if "hospital" in text or "emergency" in text or " er " in text:
assertions.append({
"subject": subject,
"predicate": "adverse_event",
"object": "hospitalization",
"severity": "high"
})
if "stopped working" in text:
assertions.append({
"subject": subject,
"predicate": "efficacy_issue",
"object": "tolerance",
"severity": "medium"
})
return assertions
def extract_assertion_llm(post: dict) -> list:
"""
Uses OpenAI to extract structured assertions from raw text.
"""
import openai
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
prompt = f"""Analyze this Reddit post about GLP-1 weight loss drugs.
Extract any adverse health events (side effects) mentioned.
Return JSON with findings list.
Post Title: {post['title']}
Post Body: {post['text'][:1500]}
Format:
{{
"findings": [
{{
"subject": "drug name (semaglutide/tirzepatide)",
"predicate": "side_effect",
"object": "specific symptom",
"severity": "low/medium/high"
}}
]
}}"""
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
data = json.loads(response.choices[0].message.content)
return data.get("findings", [])
except Exception as e:
print(f"[!] LLM extraction failed: {e}")
return extract_assertion_mock(post) # Fallback to mock
def format_as_stemedb(raw_findings: list, post: dict) -> list:
"""
Wraps extracted findings in the StemeDB Assertion envelope.
"""
steme_assertions = []
for finding in raw_findings:
assertion = {
"id": str(uuid.uuid4()),
"subject": finding.get("subject", "unknown_molecule"),
"predicate": finding.get("predicate", "related_to"),
"object": finding.get("object", "unknown_effect"),
"confidence": 0.5, # Tier 5 starts low
"source_class": SOURCE_CLASS_SOCIAL,
"source_metadata": {
"type": "reddit_post",
"reddit_id": post["id"],
"url": post["url"],
"subreddit": post["subreddit"],
"author_hash": hash(post["author"]),
"severity": finding.get("severity")
},
"timestamp": int(post["created_utc"]),
"ingested_at": int(time.time())
}
steme_assertions.append(assertion)
return steme_assertions
def main():
print("=" * 50)
print("LATENT: Tier 5 (Social) Ingestor - Reddit")
print("=" * 50)
print("No API credentials needed - using public JSON endpoints\n")
all_assertions = []
use_llm = bool(os.getenv("OPENAI_API_KEY"))
if use_llm:
print("[*] Using OpenAI for extraction\n")
else:
print("[*] Using heuristic extraction (set OPENAI_API_KEY for LLM)\n")
for sub in TARGET_SUBREDDITS:
posts = fetch_posts_json(sub, limit=50)
print(f"[+] Found {len(posts)} keyword-matched posts in r/{sub}")
for post in posts:
if use_llm:
raw_findings = extract_assertion_llm(post)
else:
raw_findings = extract_assertion_mock(post)
if raw_findings:
assertions = format_as_stemedb(raw_findings, post)
all_assertions.extend(assertions)
print(f" -> {post['id']}: {len(assertions)} assertions")
time.sleep(2) # Rate limit politeness
# Output
output_file = "tier5_social_graph.jsonl"
with open(output_file, "w") as f:
for a in all_assertions:
f.write(json.dumps(a) + "\n")
print(f"\n{'=' * 50}")
print(f"[OK] {len(all_assertions)} assertions written to {output_file}")
print("=" * 50)
if __name__ == "__main__":
main()