stemedb/latent/ingest-reddit/adk-agent/signer.py
jordan b3e8a9a058 feat: Multi-application expansion with chaos testing and community UI
Major additions:
- Community Next.js app (port 18187) for browsing claims with API docs
- stemedb-chaos crate: Fault injection, chaos testing, CRDT properties
- Latent ingestion system: Reddit/FDA ingesters with ADK-Go agents
- Disputed claims handling: Manual review workflows and validation
- Aphoria security scanner: New extractors (SQL injection, command
  injection, weak crypto, TLS version), policy-based ignores, UAT reports
- Docker infrastructure: Dockerfile, docker-compose.yml for full stack
- VulnBank demo: Intentionally vulnerable multi-language test corpus

SDK & API enhancements:
- Source registry handlers for tracking data provenance
- Metrics endpoint
- Skeptic filtering improvements

Code quality:
- Split 14 large files (>500 lines) into focused modules
- All files now under 500-line limit per project guidelines

Documentation:
- Chaos testing guide, circuit breakers, observability docs
- Phase 7 UAT documentation updates
- Martin Kleppmann technical writer agent

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 01:24:14 -07:00

167 lines
4.9 KiB
Python

"""
Ed25519 signing for StemeDB assertions.
Mirrors the Go SDK pattern from sdk/go/steme/signer.go
"""
import os
import time
from dataclasses import dataclass
from nacl.signing import SigningKey, VerifyKey
from nacl.exceptions import BadSignatureError
@dataclass
class SignatureEntry:
"""Signature entry for a StemeDB assertion."""
agent_id: str # Hex-encoded 32-byte public key
signature: str # Hex-encoded 64-byte Ed25519 signature
timestamp: int # Unix epoch timestamp
version: int # Signature format version (1 = v1 legacy)
class Signer:
"""
Ed25519 signer for StemeDB assertions.
All assertions submitted to StemeDB must be signed by at least one agent.
This class abstracts the cryptographic details and provides a simple API.
"""
def __init__(self, signing_key: SigningKey):
"""
Initialize with an Ed25519 signing key.
Use class methods (generate, from_hex, from_env) to create instances.
"""
self._signing_key = signing_key
self._verify_key = signing_key.verify_key
@classmethod
def generate(cls) -> "Signer":
"""
Generate a new random Ed25519 keypair.
Useful for testing or creating new agent identities.
The seed can be retrieved with seed() for persistence.
"""
return cls(SigningKey.generate())
@classmethod
def from_hex(cls, hex_seed: str) -> "Signer":
"""
Create a Signer from a hex-encoded 32-byte seed.
The hex string should be exactly 64 characters (32 bytes).
Raises:
ValueError: If hex_seed is not valid hex or wrong length.
"""
try:
seed_bytes = bytes.fromhex(hex_seed)
except ValueError as e:
raise ValueError(f"Invalid hex string: {e}") from e
if len(seed_bytes) != 32:
raise ValueError(f"Seed must be 32 bytes, got {len(seed_bytes)}")
return cls(SigningKey(seed_bytes))
@classmethod
def from_env(cls, env_var: str = "STEMEDB_AGENT_SEED") -> "Signer":
"""
Create a Signer from an environment variable.
The environment variable should contain a hex-encoded 32-byte seed.
Raises:
ValueError: If the variable is not set or contains invalid data.
"""
hex_seed = os.getenv(env_var)
if not hex_seed:
raise ValueError(f"Environment variable {env_var} not set")
return cls.from_hex(hex_seed)
@property
def public_key(self) -> str:
"""
Return the hex-encoded Ed25519 public key (32 bytes).
This is the agent_id used in StemeDB.
"""
return self._verify_key.encode().hex()
@property
def seed(self) -> str:
"""
Return the hex-encoded Ed25519 private key seed (32 bytes).
Store this securely for key recovery.
"""
# PyNaCl stores the seed in the first 32 bytes of the signing key
return bytes(self._signing_key).hex()[:64] # First 32 bytes as hex
def sign(self, message: bytes) -> str:
"""
Create an Ed25519 signature over the message bytes.
Returns the hex-encoded 64-byte signature.
"""
signed = self._signing_key.sign(message)
# signed.signature is the 64-byte signature
return signed.signature.hex()
def create_signature(self, subject: str, predicate: str) -> SignatureEntry:
"""
Create a SignatureEntry for an assertion (v1 legacy format).
The message is the canonical serialization "{subject}:{predicate}".
This only covers subject and predicate - other fields can be tampered.
The timestamp is set to the current Unix epoch.
"""
message = f"{subject}:{predicate}".encode("utf-8")
timestamp = int(time.time())
return SignatureEntry(
agent_id=self.public_key,
signature=self.sign(message),
timestamp=timestamp,
version=1,
)
def verify(public_key_hex: str, signature_hex: str, message: bytes) -> bool:
"""
Verify an Ed25519 signature.
Useful for validating signatures received from other agents.
Returns:
True if signature is valid, False otherwise.
Raises:
ValueError: If the public key or signature hex is invalid.
"""
try:
public_key_bytes = bytes.fromhex(public_key_hex)
signature_bytes = bytes.fromhex(signature_hex)
except ValueError as e:
raise ValueError(f"Invalid hex: {e}") from e
if len(public_key_bytes) != 32:
raise ValueError(f"Public key must be 32 bytes, got {len(public_key_bytes)}")
if len(signature_bytes) != 64:
raise ValueError(f"Signature must be 64 bytes, got {len(signature_bytes)}")
verify_key = VerifyKey(public_key_bytes)
try:
verify_key.verify(message, signature_bytes)
return True
except BadSignatureError:
return False