tidaldb/applications/iknowyou/lib/observer.ts
jordan eca7765e8d fix: heal_region re-delivers missed WAL batches so partitioned followers converge immediately after heal
- Extract redeliver_missed(tx, db, log) helper into cluster_transport.rs
- heal_region now removes partition then immediately ships any missed
  batch-log entries to the healed follower's channel
- await_convergence refactored to call the same helper (no logic change)
- tidal-server: reload_text_index before search in cluster mode
- tidal-server: write_signal returns Result instead of panicking on unknown signal
- tidal-server: leader shows lag_events=0 (writes directly, no receiver thread)
- tidal-server: fix cluster mode error propagation (ServerError::from)
- docs/runbooks/cluster.md: add full cluster operations runbook
- docker/: add Dockerfile for containerised cluster deployment
- README.md: add tidal-server HTTP API getting-started section
- Split oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-25 11:57:01 -07:00

354 lines
12 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { complete } from "./vllm";
import { remember } from "./synap";
import type {
ObserverOutput,
EngagementSignals,
StyleProfile,
TopicAnalysis,
ConversationDynamics,
SignalMemory,
} from "./types";
// ---------------------------------------------------------------------------
// Tier 1: Structured ObserverOutput extraction (every exchange)
// ---------------------------------------------------------------------------
const STRUCTURED_OBSERVER_PROMPT = `You are a communication signal extractor. Analyze a conversation exchange and extract structured signals about how the person communicates.
Extract ONLY what is clearly evidenced. Do not speculate or infer beyond the text.
Respond with a single JSON object (no markdown, no explanation) matching this exact schema:
{
"engagement": {
"replied": true/false,
"substantive": true/false,
"word_count": number,
"sentiment_score": 0.0 to 1.0 (0.5 = neutral),
"sentiment_direction": "positive" | "negative" | "neutral"
},
"style": {
"formality": 0.0 to 1.0 (0 = very casual, 1 = very formal),
"uses_lowercase": true/false,
"uses_jargon": true/false,
"structure": "stream_of_thought" | "structured" | "narrative" | "technical",
"emoji": true/false
},
"topic": {
"primary": "short topic label",
"domain": "broader domain category",
"specificity": "surface" | "intermediate" | "expert",
"continued_from_previous": true/false,
"deepened": true/false
},
"dynamics": {
"redirected": true/false,
"redirect_direction": "description or empty string",
"who_is_leading": "person" | "system",
"built_on_previous": true/false,
"corrected_system": true/false
}
}`;
export interface ObserverContext {
turnNumber: number;
latencySeconds?: number;
previousTopic?: string;
}
/** Extract full ObserverOutput from a single exchange. */
export async function extractObserverOutput(
userMessage: string,
assistantMessage: string,
ctx: ObserverContext
): Promise<ObserverOutput | null> {
const contextLines: string[] = [];
if (ctx.previousTopic) contextLines.push(`Previous topic: ${ctx.previousTopic}`);
contextLines.push(`Turn number: ${ctx.turnNumber}`);
const raw = await complete([
{ role: "system", content: STRUCTURED_OBSERVER_PROMPT },
{
role: "user",
content: `${contextLines.join("\n")}\n\nAssistant said: "${assistantMessage}"\nPerson replied: "${userMessage}"`,
},
]);
try {
const parsed = extractJson(raw);
if (!parsed || typeof parsed !== "object") return null;
const output = validateObserverOutput(parsed);
// Inject locally-computed fields
output.engagement.word_count = countWords(userMessage);
if (ctx.latencySeconds !== undefined) {
output.engagement.latency_seconds = ctx.latencySeconds;
}
return output;
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error(`[observer] failed to parse ObserverOutput: ${msg}`, raw.slice(0, 200));
return null;
}
}
// ---------------------------------------------------------------------------
// Tier 2: Natural-language observation synthesis (every 5 turns)
// ---------------------------------------------------------------------------
const SYNTHESIS_PROMPT = `You are an observation synthesizer. Given structured communication signals from recent exchanges, produce 1-3 concise natural-language observations about this person's communication patterns.
Each observation should be:
- A specific, actionable insight (not vague)
- Based on patterns across multiple exchanges (not single data points)
- Written as a statement about the person, e.g. "They respond fastest to direct technical questions"
Respond with ONLY a JSON array of strings. Example:
["They prefer casual, lowercase text and rarely use emoji", "They tend to lead the conversation toward specific technical problems"]
If no clear patterns emerge, respond with: []`;
/** Synthesize natural-language observations from accumulated signals. */
export async function synthesizeObservations(
recentSignals: ObserverOutput[],
conversationSnippet: string
): Promise<string[]> {
if (recentSignals.length < 3) return [];
const signalSummary = recentSignals.map((s, i) => {
return [
`Turn ${i + 1}:`,
` style: formality=${s.style.formality}, lowercase=${s.style.uses_lowercase}, structure=${s.style.structure}`,
` topic: ${s.topic.primary} (${s.topic.domain}, ${s.topic.specificity})${s.topic.deepened ? " [deepened]" : ""}`,
` sentiment: ${s.engagement.sentiment_score} (${s.engagement.sentiment_direction})`,
` dynamics: leading=${s.dynamics.who_is_leading}, built_on_previous=${s.dynamics.built_on_previous}`,
].join("\n");
}).join("\n\n");
const raw = await complete([
{ role: "system", content: SYNTHESIS_PROMPT },
{
role: "user",
content: `Signals from last ${recentSignals.length} exchanges:\n\n${signalSummary}\n\nRecent conversation context:\n${conversationSnippet}`,
},
]);
try {
const trimmed = raw.trim();
const start = trimmed.indexOf("[");
const end = trimmed.lastIndexOf("]");
if (start === -1 || end === -1) return [];
const parsed = JSON.parse(trimmed.slice(start, end + 1));
if (!Array.isArray(parsed)) return [];
return parsed.filter((s): s is string => typeof s === "string" && s.length > 0);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error(`[observer] failed to parse synthesis: ${msg}`, raw.slice(0, 200));
return [];
}
}
// ---------------------------------------------------------------------------
// Storage: convert ObserverOutput → SignalMemory[] → Synap
// ---------------------------------------------------------------------------
/** Convert ObserverOutput to dimension-tagged signal memories. */
export function outputToSignalMemories(
output: ObserverOutput,
conversationId: string,
personId?: string
): SignalMemory[] {
const convTag = `conv:${conversationId}`;
const personTag = personId ? `person:${personId}` : null;
const memories: SignalMemory[] = [];
// Engagement
const sentimentLabel = output.engagement.sentiment_direction;
const sentimentConf = Math.abs(output.engagement.sentiment_score - 0.5) * 2; // distance from neutral
memories.push({
dimension: "engagement",
content: `sentiment: ${output.engagement.sentiment_score.toFixed(2)} (${sentimentLabel}), substantive: ${output.engagement.substantive}, words: ${output.engagement.word_count}`,
confidence: 0.5 + sentimentConf * 0.4, // 0.50.9 range
tags: ["signal:engagement", `sub:sentiment_${sentimentLabel}`, convTag, ...(personTag ? [personTag] : [])],
});
if (output.engagement.latency_seconds > 0) {
const fast = output.engagement.latency_seconds < 120;
memories.push({
dimension: "engagement",
content: `response latency: ${output.engagement.latency_seconds}s (${fast ? "fast" : "slow"})`,
confidence: 0.7,
tags: ["signal:engagement", `sub:latency_${fast ? "fast" : "slow"}`, convTag, ...(personTag ? [personTag] : [])],
});
}
// Style
memories.push({
dimension: "style",
content: `formality: ${output.style.formality.toFixed(2)}, lowercase: ${output.style.uses_lowercase}, jargon: ${output.style.uses_jargon}, structure: ${output.style.structure}, emoji: ${output.style.emoji}`,
confidence: 0.75,
tags: ["signal:style", "sub:profile", convTag, ...(personTag ? [personTag] : [])],
});
// Topic
memories.push({
dimension: "topic",
content: `topic: ${output.topic.primary} (${output.topic.domain}, ${output.topic.specificity})${output.topic.deepened ? " [deepened]" : ""}${output.topic.continued_from_previous ? " [continued]" : ""}`,
confidence: 0.8,
tags: ["signal:topic", `sub:${output.topic.domain}`, convTag, ...(personTag ? [personTag] : [])],
});
// Dynamics
memories.push({
dimension: "dynamics",
content: `leading: ${output.dynamics.who_is_leading}, built_on_previous: ${output.dynamics.built_on_previous}${output.dynamics.redirected ? `, redirected: ${output.dynamics.redirect_direction}` : ""}${output.dynamics.corrected_system ? ", corrected system" : ""}`,
confidence: 0.7,
tags: ["signal:dynamics", `sub:${output.dynamics.who_is_leading}_leads`, convTag, ...(personTag ? [personTag] : [])],
});
return memories;
}
/** Store signal memories in Synap. */
export async function storeSignals(
memories: SignalMemory[]
): Promise<void> {
if (!memories.length) return;
console.log(`[observer] storing ${memories.length} signal memories`);
const promises = memories.map((mem) =>
remember(mem.content, {
confidence: mem.confidence,
memoryType: "semantic",
tags: [...mem.tags],
}).catch((err) =>
console.error("[observer] failed to store signal:", mem.dimension, err.message)
)
);
await Promise.allSettled(promises);
}
/** Store synthesized observations in Synap. */
export async function storeObservations(
observations: string[],
conversationId: string,
personId?: string
): Promise<void> {
if (!observations.length) return;
console.log(`[observer] storing ${observations.length} synthesized observations:`, observations);
const tags = ["observation", "synthesized", `conv:${conversationId}`];
if (personId) tags.push(`person:${personId}`);
const promises = observations.map((obs) =>
remember(obs, {
confidence: 0.85,
memoryType: "semantic",
tags: [...tags],
}).catch((err) =>
console.error("[observer] failed to store observation:", obs.slice(0, 60), err.message)
)
);
await Promise.allSettled(promises);
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/** Extract a JSON object from potentially noisy LLM output. */
function extractJson(raw: string): Record<string, unknown> | null {
const trimmed = raw.trim();
const start = trimmed.indexOf("{");
const end = trimmed.lastIndexOf("}");
if (start === -1 || end === -1) return null;
return JSON.parse(trimmed.slice(start, end + 1));
}
function countWords(text: string): number {
return text.split(/\s+/).filter(Boolean).length;
}
function clamp(val: number, min: number, max: number): number {
return Math.min(max, Math.max(min, val));
}
/** Validate and fill defaults for a raw parsed ObserverOutput. */
function validateObserverOutput(raw: Record<string, unknown>): ObserverOutput {
const eng = (raw.engagement ?? {}) as Record<string, unknown>;
const sty = (raw.style ?? {}) as Record<string, unknown>;
const top = (raw.topic ?? {}) as Record<string, unknown>;
const dyn = (raw.dynamics ?? {}) as Record<string, unknown>;
const engagement: EngagementSignals = {
replied: Boolean(eng.replied ?? true),
latency_seconds: Number(eng.latency_seconds) || 0,
substantive: Boolean(eng.substantive ?? false),
word_count: Number(eng.word_count) || 0,
sentiment_score: clamp(Number(eng.sentiment_score) || 0.5, 0, 1),
sentiment_direction: validateEnum(
eng.sentiment_direction,
["positive", "negative", "neutral"],
"neutral"
),
};
const style: StyleProfile = {
formality: clamp(Number(sty.formality) || 0.5, 0, 1),
uses_lowercase: Boolean(sty.uses_lowercase ?? false),
uses_jargon: Boolean(sty.uses_jargon ?? false),
structure: validateEnum(
sty.structure,
["stream_of_thought", "structured", "narrative", "technical"],
"stream_of_thought"
),
emoji: Boolean(sty.emoji ?? false),
};
const topic: TopicAnalysis = {
primary: String(top.primary ?? "general"),
domain: String(top.domain ?? "general"),
specificity: validateEnum(
top.specificity,
["surface", "intermediate", "expert"],
"surface"
),
continued_from_previous: Boolean(top.continued_from_previous ?? false),
deepened: Boolean(top.deepened ?? false),
};
const dynamics: ConversationDynamics = {
redirected: Boolean(dyn.redirected ?? false),
redirect_direction: String(dyn.redirect_direction ?? ""),
who_is_leading: validateEnum(
dyn.who_is_leading,
["person", "system"],
"system"
),
built_on_previous: Boolean(dyn.built_on_previous ?? false),
corrected_system: Boolean(dyn.corrected_system ?? false),
};
return { engagement, style, topic, dynamics };
}
function validateEnum<T extends string>(
value: unknown,
allowed: T[],
fallback: T
): T {
if (typeof value === "string" && allowed.includes(value as T)) {
return value as T;
}
return fallback;
}