Major additions: - Community Next.js app (port 18187) for browsing claims with API docs - stemedb-chaos crate: Fault injection, chaos testing, CRDT properties - Latent ingestion system: Reddit/FDA ingesters with ADK-Go agents - Disputed claims handling: Manual review workflows and validation - Aphoria security scanner: New extractors (SQL injection, command injection, weak crypto, TLS version), policy-based ignores, UAT reports - Docker infrastructure: Dockerfile, docker-compose.yml for full stack - VulnBank demo: Intentionally vulnerable multi-language test corpus SDK & API enhancements: - Source registry handlers for tracking data provenance - Metrics endpoint - Skeptic filtering improvements Code quality: - Split 14 large files (>500 lines) into focused modules - All files now under 500-line limit per project guidelines Documentation: - Chaos testing guide, circuit breakers, observability docs - Phase 7 UAT documentation updates - Martin Kleppmann technical writer agent Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
90 lines
2.1 KiB
Python
90 lines
2.1 KiB
Python
"""
|
|
VulnBank - Database Operations with intentional vulnerabilities
|
|
|
|
Vulnerabilities:
|
|
- SQL injection via string formatting
|
|
- SQL injection via format()
|
|
"""
|
|
|
|
import mysql.connector
|
|
|
|
|
|
def get_connection():
|
|
"""Get database connection"""
|
|
return mysql.connector.connect(
|
|
host="localhost",
|
|
user="vulnbank",
|
|
password="password123", # BLOCK: Hardcoded password
|
|
database="vulnbank"
|
|
)
|
|
|
|
|
|
def get_user_by_id(user_id: str) -> dict:
|
|
"""
|
|
VULNERABILITY: SQL injection via f-string
|
|
User input directly interpolated into SQL query
|
|
"""
|
|
conn = get_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# BLOCK: SQL injection - user input directly in query string
|
|
query = f"SELECT * FROM users WHERE id = {user_id}"
|
|
cursor.execute(query)
|
|
|
|
result = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
def delete_order(order_id: str) -> bool:
|
|
"""
|
|
VULNERABILITY: SQL injection via format()
|
|
User input directly interpolated into SQL query
|
|
"""
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# BLOCK: SQL injection - format() with user input
|
|
query = "DELETE FROM orders WHERE id = {}".format(order_id)
|
|
cursor.execute(query)
|
|
|
|
conn.commit()
|
|
affected = cursor.rowcount
|
|
cursor.close()
|
|
conn.close()
|
|
return affected > 0
|
|
|
|
|
|
def search_products(name: str) -> list:
|
|
"""
|
|
VULNERABILITY: SQL injection via string concatenation
|
|
"""
|
|
conn = get_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# BLOCK: SQL injection - string concatenation
|
|
query = "SELECT * FROM products WHERE name LIKE '%" + name + "%'"
|
|
cursor.execute(query)
|
|
|
|
results = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
return results
|
|
|
|
|
|
# Safe version for comparison
|
|
def get_user_by_id_safe(user_id: str) -> dict:
|
|
"""Safe version using parameterized queries"""
|
|
conn = get_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# This is the correct approach - parameterized query
|
|
query = "SELECT * FROM users WHERE id = %s"
|
|
cursor.execute(query, (user_id,))
|
|
|
|
result = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
return result
|