Major additions: - Community Next.js app (port 18187) for browsing claims with API docs - stemedb-chaos crate: Fault injection, chaos testing, CRDT properties - Latent ingestion system: Reddit/FDA ingesters with ADK-Go agents - Disputed claims handling: Manual review workflows and validation - Aphoria security scanner: New extractors (SQL injection, command injection, weak crypto, TLS version), policy-based ignores, UAT reports - Docker infrastructure: Dockerfile, docker-compose.yml for full stack - VulnBank demo: Intentionally vulnerable multi-language test corpus SDK & API enhancements: - Source registry handlers for tracking data provenance - Metrics endpoint - Skeptic filtering improvements Code quality: - Split 14 large files (>500 lines) into focused modules - All files now under 500-line limit per project guidelines Documentation: - Chaos testing guide, circuit breakers, observability docs - Phase 7 UAT documentation updates - Martin Kleppmann technical writer agent Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
167 lines
4.9 KiB
Python
167 lines
4.9 KiB
Python
"""
|
|
Ed25519 signing for StemeDB assertions.
|
|
|
|
Mirrors the Go SDK pattern from sdk/go/steme/signer.go
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass
|
|
|
|
from nacl.signing import SigningKey, VerifyKey
|
|
from nacl.exceptions import BadSignatureError
|
|
|
|
|
|
@dataclass
|
|
class SignatureEntry:
|
|
"""Signature entry for a StemeDB assertion."""
|
|
|
|
agent_id: str # Hex-encoded 32-byte public key
|
|
signature: str # Hex-encoded 64-byte Ed25519 signature
|
|
timestamp: int # Unix epoch timestamp
|
|
version: int # Signature format version (1 = v1 legacy)
|
|
|
|
|
|
class Signer:
|
|
"""
|
|
Ed25519 signer for StemeDB assertions.
|
|
|
|
All assertions submitted to StemeDB must be signed by at least one agent.
|
|
This class abstracts the cryptographic details and provides a simple API.
|
|
"""
|
|
|
|
def __init__(self, signing_key: SigningKey):
|
|
"""
|
|
Initialize with an Ed25519 signing key.
|
|
|
|
Use class methods (generate, from_hex, from_env) to create instances.
|
|
"""
|
|
self._signing_key = signing_key
|
|
self._verify_key = signing_key.verify_key
|
|
|
|
@classmethod
|
|
def generate(cls) -> "Signer":
|
|
"""
|
|
Generate a new random Ed25519 keypair.
|
|
|
|
Useful for testing or creating new agent identities.
|
|
The seed can be retrieved with seed() for persistence.
|
|
"""
|
|
return cls(SigningKey.generate())
|
|
|
|
@classmethod
|
|
def from_hex(cls, hex_seed: str) -> "Signer":
|
|
"""
|
|
Create a Signer from a hex-encoded 32-byte seed.
|
|
|
|
The hex string should be exactly 64 characters (32 bytes).
|
|
|
|
Raises:
|
|
ValueError: If hex_seed is not valid hex or wrong length.
|
|
"""
|
|
try:
|
|
seed_bytes = bytes.fromhex(hex_seed)
|
|
except ValueError as e:
|
|
raise ValueError(f"Invalid hex string: {e}") from e
|
|
|
|
if len(seed_bytes) != 32:
|
|
raise ValueError(f"Seed must be 32 bytes, got {len(seed_bytes)}")
|
|
|
|
return cls(SigningKey(seed_bytes))
|
|
|
|
@classmethod
|
|
def from_env(cls, env_var: str = "STEMEDB_AGENT_SEED") -> "Signer":
|
|
"""
|
|
Create a Signer from an environment variable.
|
|
|
|
The environment variable should contain a hex-encoded 32-byte seed.
|
|
|
|
Raises:
|
|
ValueError: If the variable is not set or contains invalid data.
|
|
"""
|
|
hex_seed = os.getenv(env_var)
|
|
if not hex_seed:
|
|
raise ValueError(f"Environment variable {env_var} not set")
|
|
|
|
return cls.from_hex(hex_seed)
|
|
|
|
@property
|
|
def public_key(self) -> str:
|
|
"""
|
|
Return the hex-encoded Ed25519 public key (32 bytes).
|
|
|
|
This is the agent_id used in StemeDB.
|
|
"""
|
|
return self._verify_key.encode().hex()
|
|
|
|
@property
|
|
def seed(self) -> str:
|
|
"""
|
|
Return the hex-encoded Ed25519 private key seed (32 bytes).
|
|
|
|
Store this securely for key recovery.
|
|
"""
|
|
# PyNaCl stores the seed in the first 32 bytes of the signing key
|
|
return bytes(self._signing_key).hex()[:64] # First 32 bytes as hex
|
|
|
|
def sign(self, message: bytes) -> str:
|
|
"""
|
|
Create an Ed25519 signature over the message bytes.
|
|
|
|
Returns the hex-encoded 64-byte signature.
|
|
"""
|
|
signed = self._signing_key.sign(message)
|
|
# signed.signature is the 64-byte signature
|
|
return signed.signature.hex()
|
|
|
|
def create_signature(self, subject: str, predicate: str) -> SignatureEntry:
|
|
"""
|
|
Create a SignatureEntry for an assertion (v1 legacy format).
|
|
|
|
The message is the canonical serialization "{subject}:{predicate}".
|
|
This only covers subject and predicate - other fields can be tampered.
|
|
|
|
The timestamp is set to the current Unix epoch.
|
|
"""
|
|
message = f"{subject}:{predicate}".encode("utf-8")
|
|
timestamp = int(time.time())
|
|
|
|
return SignatureEntry(
|
|
agent_id=self.public_key,
|
|
signature=self.sign(message),
|
|
timestamp=timestamp,
|
|
version=1,
|
|
)
|
|
|
|
|
|
def verify(public_key_hex: str, signature_hex: str, message: bytes) -> bool:
|
|
"""
|
|
Verify an Ed25519 signature.
|
|
|
|
Useful for validating signatures received from other agents.
|
|
|
|
Returns:
|
|
True if signature is valid, False otherwise.
|
|
|
|
Raises:
|
|
ValueError: If the public key or signature hex is invalid.
|
|
"""
|
|
try:
|
|
public_key_bytes = bytes.fromhex(public_key_hex)
|
|
signature_bytes = bytes.fromhex(signature_hex)
|
|
except ValueError as e:
|
|
raise ValueError(f"Invalid hex: {e}") from e
|
|
|
|
if len(public_key_bytes) != 32:
|
|
raise ValueError(f"Public key must be 32 bytes, got {len(public_key_bytes)}")
|
|
|
|
if len(signature_bytes) != 64:
|
|
raise ValueError(f"Signature must be 64 bytes, got {len(signature_bytes)}")
|
|
|
|
verify_key = VerifyKey(public_key_bytes)
|
|
try:
|
|
verify_key.verify(message, signature_bytes)
|
|
return True
|
|
except BadSignatureError:
|
|
return False
|