stemedb/latent/ingest-reddit/adk-agent/tools.py
jordan b3e8a9a058 feat: Multi-application expansion with chaos testing and community UI
Major additions:
- Community Next.js app (port 18187) for browsing claims with API docs
- stemedb-chaos crate: Fault injection, chaos testing, CRDT properties
- Latent ingestion system: Reddit/FDA ingesters with ADK-Go agents
- Disputed claims handling: Manual review workflows and validation
- Aphoria security scanner: New extractors (SQL injection, command
  injection, weak crypto, TLS version), policy-based ignores, UAT reports
- Docker infrastructure: Dockerfile, docker-compose.yml for full stack
- VulnBank demo: Intentionally vulnerable multi-language test corpus

SDK & API enhancements:
- Source registry handlers for tracking data provenance
- Metrics endpoint
- Skeptic filtering improvements

Code quality:
- Split 14 large files (>500 lines) into focused modules
- All files now under 500-line limit per project guidelines

Documentation:
- Chaos testing guide, circuit breakers, observability docs
- Phase 7 UAT documentation updates
- Martin Kleppmann technical writer agent

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 01:24:14 -07:00

213 lines
6.8 KiB
Python

"""
ADK tools for the Reddit Adverse Event Agent.
These are plain functions that will be registered with the ADK Agent.
"""
import json
import time
from typing import Optional
import requests
# Support both package and script imports
try:
from .config import (
REDDIT_HEADERS,
ADVERSE_EVENT_KEYWORDS,
DRUG_MAP,
MIN_CONFIDENCE,
MAX_CONFIDENCE,
SOURCE_CLASS_SOCIAL,
DEFAULT_LIFECYCLE,
STEMEDB_URL,
ENV_STEMEDB_AGENT_SEED,
)
from .signer import Signer
from .stemedb_client import StemeDBClient
except ImportError:
from config import (
REDDIT_HEADERS,
ADVERSE_EVENT_KEYWORDS,
DRUG_MAP,
MIN_CONFIDENCE,
MAX_CONFIDENCE,
SOURCE_CLASS_SOCIAL,
DEFAULT_LIFECYCLE,
STEMEDB_URL,
ENV_STEMEDB_AGENT_SEED,
)
from signer import Signer
from stemedb_client import StemeDBClient
# Module-level client (initialized lazily)
_client: Optional[StemeDBClient] = None
def _get_client() -> StemeDBClient:
"""Get or create the StemeDB client."""
global _client
if _client is None:
signer = Signer.from_env(ENV_STEMEDB_AGENT_SEED)
_client = StemeDBClient(STEMEDB_URL, signer)
return _client
def fetch_reddit_posts(subreddit: str, limit: int = 25) -> dict:
"""
Fetch recent posts from a subreddit matching adverse event keywords.
This function scrapes Reddit's public JSON API for posts in GLP-1 medication
subreddits that mention potential adverse events or side effects.
Args:
subreddit: Name of the subreddit to scan (e.g., "Ozempic", "Mounjaro")
limit: Maximum number of posts to fetch (default: 25, max: 100)
Returns:
A dictionary with:
- subreddit: The scanned subreddit name
- total_fetched: Number of posts retrieved from Reddit
- matched_posts: Number of posts matching adverse event keywords
- posts: List of matching posts with id, title, text, url, created_utc,
score, author, and detected_drug fields
Example:
result = fetch_reddit_posts("Ozempic", limit=50)
for post in result["posts"]:
print(f"{post['title']} - {post['detected_drug']}")
"""
limit = min(limit, 100) # Reddit API limit
url = f"https://www.reddit.com/r/{subreddit}/new.json?limit={limit}"
try:
response = requests.get(url, headers=REDDIT_HEADERS, timeout=10)
response.raise_for_status()
data = response.json()
except requests.exceptions.RequestException as e:
return {
"subreddit": subreddit,
"error": str(e),
"total_fetched": 0,
"matched_posts": 0,
"posts": [],
}
matched_posts = []
children = data.get("data", {}).get("children", [])
for child in children:
post_data = child.get("data", {})
content = f"{post_data.get('title', '')} {post_data.get('selftext', '')}".lower()
# Check for adverse event keywords
if not any(keyword in content for keyword in ADVERSE_EVENT_KEYWORDS):
continue
# Detect the drug from subreddit name
detected_drug = DRUG_MAP.get(subreddit.lower(), "glp1_agonist")
matched_posts.append(
{
"id": post_data.get("id"),
"title": post_data.get("title", ""),
"text": post_data.get("selftext", "")[:2000], # Truncate long posts
"url": f"https://reddit.com{post_data.get('permalink', '')}",
"created_utc": int(post_data.get("created_utc", 0)),
"score": post_data.get("score", 0),
"author": post_data.get("author", "deleted"),
"detected_drug": detected_drug,
}
)
return {
"subreddit": subreddit,
"total_fetched": len(children),
"matched_posts": len(matched_posts),
"posts": matched_posts,
}
def store_assertion(
subject: str,
predicate: str,
object_value: str,
confidence: float,
source_url: str,
severity: Optional[str] = None,
reddit_post_id: Optional[str] = None,
) -> dict:
"""
Store a signed assertion in StemeDB.
This function creates an assertion representing an extracted adverse event
and signs it with the agent's Ed25519 key before submitting to StemeDB.
Args:
subject: The drug or entity (e.g., "semaglutide", "tirzepatide")
predicate: The type of assertion (e.g., "side_effect", "adverse_event")
object_value: The specific effect (e.g., "nausea", "gastroparesis")
confidence: Confidence score (0.0-1.0, will be clamped to 0.3-0.7 for anecdotal data)
source_url: Reddit post URL for provenance
severity: Optional severity level ("low", "medium", "high")
reddit_post_id: Optional Reddit post ID for tracking
Returns:
A dictionary with:
- success: Boolean indicating if the assertion was stored
- hash: Content-addressed hash of the assertion (if successful)
- subject, predicate, object, confidence: The stored values
- source_hash: BLAKE3 hash of the source URL
- error: Error message (if failed)
Example:
result = store_assertion(
subject="semaglutide",
predicate="side_effect",
object_value="nausea",
confidence=0.5,
source_url="https://reddit.com/r/Ozempic/comments/abc123/...",
severity="low"
)
if result["success"]:
print(f"Stored with hash: {result['hash']}")
"""
# Clamp confidence to allowed range for anecdotal data
clamped_confidence = max(MIN_CONFIDENCE, min(MAX_CONFIDENCE, confidence))
# Build source metadata
metadata = {"type": "reddit_post", "severity": severity}
if reddit_post_id:
metadata["reddit_id"] = reddit_post_id
metadata_json = json.dumps(metadata)
try:
client = _get_client()
result = client.assert_fact(
subject=subject,
predicate=predicate,
object_value=object_value,
confidence=clamped_confidence,
source_url=source_url,
source_class=SOURCE_CLASS_SOCIAL,
lifecycle=DEFAULT_LIFECYCLE,
source_metadata=metadata_json,
)
return {
"success": True,
"hash": result.hash,
"subject": subject,
"predicate": predicate,
"object": object_value,
"confidence": clamped_confidence,
"source_hash": result.hash[:16] + "...", # Truncated for display
}
except Exception as e:
return {
"success": False,
"error": str(e),
"subject": subject,
"predicate": predicate,
"object": object_value,
}