| | """ |
| | Diagnostics - Interpretability Tracing Framework |
| | |
| | This module implements the diagnostic tracing framework for agent interpretability |
| | and symbolic recursion visualization throughout the AGI-HEDGE-FUND system. |
| | |
| | Key capabilities: |
| | - Signal tracing for attribution flows |
| | - Reasoning state visualization |
| | - Consensus graph generation |
| | - Agent conflict mapping |
| | - Failure mode detection |
| | - Shell-based recursive diagnostic patterns |
| | |
| | Internal Note: The diagnostic framework encodes the symbolic interpretability shells, |
| | enabling deeper introspection into agent cognition and emergent patterns. |
| | """ |
| |
|
| | import datetime |
| | import uuid |
| | import logging |
| | import os |
| | import json |
| | from typing import Dict, List, Any, Optional, Union, Set, Tuple |
| | import traceback |
| | from collections import defaultdict |
| | import numpy as np |
| | import re |
| | from enum import Enum |
| | from pathlib import Path |
| |
|
| |
|
| | class TracingMode(Enum): |
| | """Tracing modes for diagnostic tools.""" |
| | DISABLED = "disabled" |
| | MINIMAL = "minimal" |
| | DETAILED = "detailed" |
| | COMPREHENSIVE = "comprehensive" |
| | SYMBOLIC = "symbolic" |
| |
|
| |
|
| | class DiagnosticLevel(Enum): |
| | """Diagnostic levels for trace items.""" |
| | INFO = "info" |
| | WARNING = "warning" |
| | ERROR = "error" |
| | COLLAPSE = "collapse" |
| | RECURSION = "recursion" |
| | SYMBOLIC = "symbolic" |
| |
|
| |
|
| | class ShellPattern(Enum): |
| | """Interpretability shell patterns.""" |
| | NULL_FEATURE = "v03 NULL-FEATURE" |
| | CIRCUIT_FRAGMENT = "v07 CIRCUIT-FRAGMENT" |
| | META_FAILURE = "v10 META-FAILURE" |
| | GHOST_FRAME = "v20 GHOST-FRAME" |
| | ECHO_ATTRIBUTION = "v53 ECHO-ATTRIBUTION" |
| | ATTRIBUTION_REFLECT = "v60 ATTRIBUTION-REFLECT" |
| | INVERSE_CHAIN = "v50 INVERSE-CHAIN" |
| | RECURSIVE_FRACTURE = "v12 RECURSIVE-FRACTURE" |
| | ETHICAL_INVERSION = "v301 ETHICAL-INVERSION" |
| | RESIDUAL_ALIGNMENT_DRIFT = "v152 RESIDUAL-ALIGNMENT-DRIFT" |
| |
|
| |
|
| | class TracingTools: |
| | """ |
| | Diagnostic tracing framework for model interpretability. |
| | |
| | The TracingTools provides: |
| | - Signal tracing for understanding attribution flows |
| | - Reasoning state visualization for debugging complex logic |
| | - Consensus graph generation for multi-agent coordination |
| | - Agent conflict mapping for identifying disagreements |
| | - Failure mode detection for reliability analysis |
| | """ |
| | |
| | def __init__( |
| | self, |
| | agent_id: str, |
| | agent_name: str, |
| | tracing_mode: TracingMode = TracingMode.MINIMAL, |
| | trace_dir: Optional[str] = None, |
| | trace_limit: int = 10000, |
| | ): |
| | """ |
| | Initialize tracing tools. |
| | |
| | Args: |
| | agent_id: ID of agent being traced |
| | agent_name: Name of agent being traced |
| | tracing_mode: Tracing mode |
| | trace_dir: Directory to save traces |
| | trace_limit: Maximum number of trace items to keep in memory |
| | """ |
| | self.agent_id = agent_id |
| | self.agent_name = agent_name |
| | self.tracing_mode = tracing_mode |
| | self.trace_dir = trace_dir |
| | self.trace_limit = trace_limit |
| | |
| | |
| | if trace_dir: |
| | os.makedirs(trace_dir, exist_ok=True) |
| | |
| | |
| | self.traces = [] |
| | self.trace_index = {} |
| | self.signal_traces = [] |
| | self.reasoning_traces = [] |
| | self.collapse_traces = [] |
| | self.shell_traces = [] |
| | |
| | |
| | self.shell_patterns = {} |
| | self._initialize_shell_patterns() |
| | |
| | |
| | self.stats = { |
| | "total_traces": 0, |
| | "signal_traces": 0, |
| | "reasoning_traces": 0, |
| | "collapse_traces": 0, |
| | "shell_traces": 0, |
| | "warnings": 0, |
| | "errors": 0, |
| | } |
| | |
| | def _initialize_shell_patterns(self) -> None: |
| | """Initialize shell pattern detection rules.""" |
| | |
| | self.shell_patterns[ShellPattern.NULL_FEATURE] = { |
| | "pattern": r"knowledge.*boundary|knowledge.*gap|unknown|uncertain", |
| | "confidence_threshold": 0.3, |
| | "belief_gap_threshold": 0.7, |
| | } |
| | |
| | |
| | self.shell_patterns[ShellPattern.CIRCUIT_FRAGMENT] = { |
| | "pattern": r"broken.*path|attribution.*break|logical.*gap|incomplete.*reasoning", |
| | "step_break_threshold": 0.5, |
| | } |
| | |
| | |
| | self.shell_patterns[ShellPattern.META_FAILURE] = { |
| | "pattern": r"meta.*failure|recursive.*loop|self.*reference|recursive.*error", |
| | "recursion_depth_threshold": 3, |
| | } |
| | |
| | |
| | self.shell_patterns[ShellPattern.GHOST_FRAME] = { |
| | "pattern": r"agent.*identity|residual.*frame|persistent.*identity|agent.*trace", |
| | "identity_threshold": 0.6, |
| | } |
| | |
| | |
| | self.shell_patterns[ShellPattern.ECHO_ATTRIBUTION] = { |
| | "pattern": r"causal.*chain|attribution.*path|decision.*trace|backpropagation", |
| | "path_length_threshold": 3, |
| | } |
| | |
| | |
| | self.shell_patterns[ShellPattern.ATTRIBUTION_REFLECT] = { |
| | "pattern": r"multi.*head|contribution.*analysis|attention.*weights|attribution.*weighting", |
| | "head_count_threshold": 2, |
| | } |
| | |
| | |
| | self.shell_patterns[ShellPattern.INVERSE_CHAIN] = { |
| | "pattern": r"mismatch|output.*attribution|attribution.*mismatch|inconsistent.*output", |
| | "mismatch_threshold": 0.5, |
| | } |
| | |
| | |
| | self.shell_patterns[ShellPattern.RECURSIVE_FRACTURE] = { |
| | "pattern": r"circular.*reasoning|loop.*detection|recursive.*fracture|circular.*attribution", |
| | "loop_length_threshold": 2, |
| | } |
| | |
| | |
| | self.shell_patterns[ShellPattern.ETHICAL_INVERSION] = { |
| | "pattern": r"value.*inversion|ethical.*reversal|principle.*conflict|value.*contradiction", |
| | "polarity_threshold": 0.7, |
| | } |
| | |
| | |
| | self.shell_patterns[ShellPattern.RESIDUAL_ALIGNMENT_DRIFT] = { |
| | "pattern": r"belief.*drift|alignment.*shift|value.*drift|gradual.*change", |
| | "drift_magnitude_threshold": 0.3, |
| | } |
| | |
| | def record_trace(self, trace_type: str, content: Dict[str, Any], |
| | level: DiagnosticLevel = DiagnosticLevel.INFO) -> str: |
| | """ |
| | Record a general trace item. |
| | |
| | Args: |
| | trace_type: Type of trace |
| | content: Trace content |
| | level: Diagnostic level |
| | |
| | Returns: |
| | Trace ID |
| | """ |
| | |
| | if self.tracing_mode == TracingMode.DISABLED: |
| | return "" |
| | |
| | |
| | trace_id = str(uuid.uuid4()) |
| | timestamp = datetime.datetime.now() |
| | |
| | trace_item = { |
| | "trace_id": trace_id, |
| | "agent_id": self.agent_id, |
| | "agent_name": self.agent_name, |
| | "trace_type": trace_type, |
| | "level": level.value, |
| | "content": content, |
| | "timestamp": timestamp.isoformat(), |
| | } |
| | |
| | |
| | shell_patterns = self._detect_shell_patterns(trace_type, content) |
| | if shell_patterns: |
| | trace_item["shell_patterns"] = shell_patterns |
| | self.shell_traces.append(trace_id) |
| | self.stats["shell_traces"] += 1 |
| | |
| | |
| | self.traces.append(trace_item) |
| | self.trace_index[trace_id] = len(self.traces) - 1 |
| | |
| | |
| | if trace_type == "signal": |
| | self.signal_traces.append(trace_id) |
| | self.stats["signal_traces"] += 1 |
| | elif trace_type == "reasoning": |
| | self.reasoning_traces.append(trace_id) |
| | self.stats["reasoning_traces"] += 1 |
| | elif trace_type == "collapse": |
| | self.collapse_traces.append(trace_id) |
| | self.stats["collapse_traces"] += 1 |
| | |
| | |
| | self.stats["total_traces"] += 1 |
| | if level == DiagnosticLevel.WARNING: |
| | self.stats["warnings"] += 1 |
| | elif level == DiagnosticLevel.ERROR: |
| | self.stats["errors"] += 1 |
| | |
| | |
| | if self.trace_dir: |
| | self._save_trace_to_file(trace_item) |
| | |
| | |
| | if len(self.traces) > self.trace_limit: |
| | |
| | oldest_trace = self.traces.pop(0) |
| | del self.trace_index[oldest_trace["trace_id"]] |
| | |
| | |
| | self.trace_index = {trace_id: i for i, trace in enumerate(self.traces) |
| | for trace_id in [trace["trace_id"]]} |
| | |
| | return trace_id |
| | |
| | def record_signal(self, signal: Any) -> str: |
| | """ |
| | Record a signal trace. |
| | |
| | Args: |
| | signal: Signal to record |
| | |
| | Returns: |
| | Trace ID |
| | """ |
| | |
| | if hasattr(signal, "dict"): |
| | signal_dict = signal.dict() |
| | elif isinstance(signal, dict): |
| | signal_dict = signal |
| | else: |
| | signal_dict = {"signal": str(signal)} |
| | |
| | |
| | if "timestamp" not in signal_dict: |
| | signal_dict["timestamp"] = datetime.datetime.now().isoformat() |
| | |
| | |
| | return self.record_trace("signal", signal_dict) |
| | |
| | def record_reasoning(self, reasoning_state: Dict[str, Any], |
| | level: DiagnosticLevel = DiagnosticLevel.INFO) -> str: |
| | """ |
| | Record a reasoning trace. |
| | |
| | Args: |
| | reasoning_state: Reasoning state |
| | level: Diagnostic level |
| | |
| | Returns: |
| | Trace ID |
| | """ |
| | |
| | return self.record_trace("reasoning", reasoning_state, level) |
| | |
| | def record_collapse(self, collapse_type: str, collapse_reason: str, |
| | details: Dict[str, Any]) -> str: |
| | """ |
| | Record a collapse trace. |
| | |
| | Args: |
| | collapse_type: Type of collapse |
| | collapse_reason: Reason for collapse |
| | details: Collapse details |
| | |
| | Returns: |
| | Trace ID |
| | """ |
| | |
| | collapse_content = { |
| | "collapse_type": collapse_type, |
| | "collapse_reason": collapse_reason, |
| | "details": details, |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | } |
| | |
| | |
| | return self.record_trace("collapse", collapse_content, DiagnosticLevel.COLLAPSE) |
| | |
| | def record_shell_trace(self, shell_pattern: ShellPattern, content: Dict[str, Any]) -> str: |
| | """ |
| | Record a shell pattern trace. |
| | |
| | Args: |
| | shell_pattern: Shell pattern |
| | content: Trace content |
| | |
| | Returns: |
| | Trace ID |
| | """ |
| | |
| | shell_content = { |
| | "shell_pattern": shell_pattern.value, |
| | "content": content, |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | } |
| | |
| | |
| | return self.record_trace("shell", shell_content, DiagnosticLevel.SYMBOLIC) |
| | |
| | def get_trace(self, trace_id: str) -> Optional[Dict[str, Any]]: |
| | """ |
| | Get trace by ID. |
| | |
| | Args: |
| | trace_id: Trace ID |
| | |
| | Returns: |
| | Trace item or None if not found |
| | """ |
| | if trace_id not in self.trace_index: |
| | return None |
| | |
| | return self.traces[self.trace_index[trace_id]] |
| | |
| | def get_traces_by_type(self, trace_type: str, limit: int = 10) -> List[Dict[str, Any]]: |
| | """ |
| | Get traces by type. |
| | |
| | Args: |
| | trace_type: Trace type |
| | limit: Maximum number of traces to return |
| | |
| | Returns: |
| | List of trace items |
| | """ |
| | if trace_type == "signal": |
| | trace_ids = self.signal_traces[-limit:] |
| | elif trace_type == "reasoning": |
| | trace_ids = self.reasoning_traces[-limit:] |
| | elif trace_type == "collapse": |
| | trace_ids = self.collapse_traces[-limit:] |
| | elif trace_type == "shell": |
| | trace_ids = self.shell_traces[-limit:] |
| | else: |
| | |
| | trace_ids = [trace["trace_id"] for trace in self.traces |
| | if trace["trace_type"] == trace_type][-limit:] |
| | |
| | |
| | return [self.get_trace(trace_id) for trace_id in trace_ids if trace_id in self.trace_index] |
| | |
| | def get_traces_by_level(self, level: DiagnosticLevel, limit: int = 10) -> List[Dict[str, Any]]: |
| | """ |
| | Get traces by diagnostic level. |
| | |
| | Args: |
| | level: Diagnostic level |
| | limit: Maximum number of traces to return |
| | |
| | Returns: |
| | List of trace items |
| | """ |
| | |
| | trace_ids = [trace["trace_id"] for trace in self.traces |
| | if trace.get("level") == level.value][-limit:] |
| | |
| | |
| | return [self.get_trace(trace_id) for trace_id in trace_ids if trace_id in self.trace_index] |
| | |
| | def get_shell_traces(self, shell_pattern: Optional[ShellPattern] = None, |
| | limit: int = 10) -> List[Dict[str, Any]]: |
| | """ |
| | Get shell pattern traces. |
| | |
| | Args: |
| | shell_pattern: Optional specific shell pattern |
| | limit: Maximum number of traces to return |
| | |
| | Returns: |
| | List of trace items |
| | """ |
| | if shell_pattern: |
| | |
| | trace_ids = [] |
| | for trace in self.traces: |
| | if "shell_patterns" in trace and shell_pattern.value in trace["shell_patterns"]: |
| | trace_ids.append(trace["trace_id"]) |
| | |
| | |
| | trace_ids = trace_ids[-limit:] |
| | else: |
| | |
| | trace_ids = self.shell_traces[-limit:] |
| | |
| | |
| | return [self.get_trace(trace_id) for trace_id in trace_ids if trace_id in self.trace_index] |
| | |
| | def get_trace_stats(self) -> Dict[str, Any]: |
| | """ |
| | Get trace statistics. |
| | |
| | Returns: |
| | Trace statistics |
| | """ |
| | |
| | shell_pattern_stats = {} |
| | for shell_pattern in ShellPattern: |
| | count = sum(1 for trace in self.traces |
| | if "shell_patterns" in trace and shell_pattern.value in trace["shell_patterns"]) |
| | |
| | shell_pattern_stats[shell_pattern.value] = count |
| | |
| | |
| | stats = { |
| | **self.stats, |
| | "shell_patterns": shell_pattern_stats, |
| | } |
| | |
| | return stats |
| | |
| | def clear_traces(self) -> int: |
| | """ |
| | Clear all traces. |
| | |
| | Returns: |
| | Number of traces cleared |
| | """ |
| | trace_count = len(self.traces) |
| | |
| | |
| | self.traces = [] |
| | self.trace_index = {} |
| | self.signal_traces = [] |
| | self.reasoning_traces = [] |
| | self.collapse_traces = [] |
| | self.shell_traces = [] |
| | |
| | |
| | self.stats = { |
| | "total_traces": 0, |
| | "signal_traces": 0, |
| | "reasoning_traces": 0, |
| | "collapse_traces": 0, |
| | "shell_traces": 0, |
| | "warnings": 0, |
| | "errors": 0, |
| | } |
| | |
| | return trace_count |
| | |
| | def _detect_shell_patterns(self, trace_type: str, content: Dict[str, Any]) -> List[str]: |
| | """ |
| | Detect shell patterns in trace content. |
| | |
| | Args: |
| | trace_type: Trace type |
| | content: Trace content |
| | |
| | Returns: |
| | List of detected shell patterns |
| | """ |
| | detected_patterns = [] |
| | |
| | |
| | content_str = json.dumps(content, ensure_ascii=False).lower() |
| | |
| | |
| | for shell_pattern, pattern_rules in self.shell_patterns.items(): |
| | pattern = pattern_rules["pattern"] |
| | |
| | |
| | if re.search(pattern, content_str, re.IGNORECASE): |
| | |
| | if self._validate_pattern_rules(shell_pattern, pattern_rules, content): |
| | detected_patterns.append(shell_pattern.value) |
| | |
| | return detected_patterns |
| | |
| | def _validate_pattern_rules(self, shell_pattern: ShellPattern, |
| | pattern_rules: Dict[str, Any], |
| | content: Dict[str, Any]) -> bool: |
| | """ |
| | Validate additional pattern rules. |
| | |
| | Args: |
| | shell_pattern: Shell pattern |
| | pattern_rules: Pattern rules |
| | content: Trace content |
| | |
| | Returns: |
| | True if pattern rules are validated |
| | """ |
| | |
| | if shell_pattern == ShellPattern.NULL_FEATURE: |
| | |
| | if "confidence" in content and content["confidence"] < pattern_rules["confidence_threshold"]: |
| | return True |
| | |
| | |
| | if "belief_state" in content: |
| | belief_values = list(content["belief_state"].values()) |
| | if belief_values and max(belief_values) - min(belief_values) > pattern_rules["belief_gap_threshold"]: |
| | return True |
| | |
| | elif shell_pattern == ShellPattern.CIRCUIT_FRAGMENT: |
| | |
| | if "steps" in content: |
| | steps = content["steps"] |
| | for i in range(len(steps) - 1): |
| | if steps[i].get("completed", True) and not steps[i+1].get("completed", True): |
| | return True |
| | |
| | |
| | if "attribution" in content and content["attribution"].get("attribution_breaks", False): |
| | return True |
| | |
| | elif shell_pattern == ShellPattern.META_FAILURE: |
| | |
| | if "depth" in content and content["depth"] >= pattern_rules["recursion_depth_threshold"]: |
| | return True |
| | |
| | |
| | if "errors" in content and any("meta" in error.get("message", "").lower() for error in content["errors"]): |
| | return True |
| | |
| | elif shell_pattern == ShellPattern.RECURSIVE_FRACTURE: |
| | |
| | if "steps" in content: |
| | steps = content["steps"] |
| | step_names = [step.get("name", "") for step in steps] |
| | |
| | |
| | for pattern_len in range(2, len(step_names) // 2 + 1): |
| | for i in range(len(step_names) - pattern_len * 2 + 1): |
| | pattern = step_names[i:i+pattern_len] |
| | next_seq = step_names[i+pattern_len:i+pattern_len*2] |
| | |
| | if pattern == next_seq: |
| | return True |
| | |
| | elif shell_pattern == ShellPattern.RESIDUAL_ALIGNMENT_DRIFT: |
| | |
| | if "drift_vector" in content: |
| | drift_values = list(content["drift_vector"].values()) |
| | if drift_values and any(abs(val) > pattern_rules["drift_magnitude_threshold"] for val in drift_values): |
| | return True |
| | |
| | |
| | if "drift_detected" in content and content["drift_detected"]: |
| | return True |
| | |
| | |
| | return True |
| | |
| | def _save_trace_to_file(self, trace_item: Dict[str, Any]) -> None: |
| | """ |
| | Save trace to file. |
| | |
| | Args: |
| | trace_item: Trace item |
| | """ |
| | if not self.trace_dir: |
| | return |
| | |
| | try: |
| | |
| | trace_id = trace_item["trace_id"] |
| | trace_type = trace_item["trace_type"] |
| | filename = f"{trace_type}_{trace_id}.json" |
| | filepath = os.path.join(self.trace_dir, filename) |
| | |
| | |
| | with open(filepath, "w") as f: |
| | json.dump(trace_item, f, indent=2) |
| | except Exception as e: |
| | logging.error(f"Error saving trace to file: {e}") |
| | logging.error(traceback.format_exc()) |
| | |
| | def generate_trace_visualization(self, trace_id: str) -> Dict[str, Any]: |
| | """ |
| | Generate visualization data for a trace. |
| | |
| | Args: |
| | trace_id: Trace ID |
| | |
| | Returns: |
| | Visualization data |
| | """ |
| | trace = self.get_trace(trace_id) |
| | if not trace: |
| | return {"error": "Trace not found"} |
| | |
| | trace_type = trace["trace_type"] |
| | |
| | if trace_type == "signal": |
| | return self._generate_signal_visualization(trace) |
| | elif trace_type == "reasoning": |
| | return self._generate_reasoning_visualization(trace) |
| | elif trace_type == "collapse": |
| | return self._generate_collapse_visualization(trace) |
| | elif trace_type == "shell": |
| | return self._generate_shell_visualization(trace) |
| | else: |
| | return { |
| | "trace_id": trace_id, |
| | "agent_name": trace["agent_name"], |
| | "trace_type": trace_type, |
| | "timestamp": trace["timestamp"], |
| | "content": trace["content"], |
| | } |
| | |
| | def _generate_signal_visualization(self, trace: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Generate visualization data for a signal trace. |
| | |
| | Args: |
| | trace: Signal trace |
| | |
| | Returns: |
| | Visualization data |
| | """ |
| | content = trace["content"] |
| | |
| | |
| | visualization = { |
| | "trace_id": trace["trace_id"], |
| | "agent_name": trace["agent_name"], |
| | "trace_type": "signal", |
| | "timestamp": trace["timestamp"], |
| | "signal_data": { |
| | "ticker": content.get("ticker", ""), |
| | "action": content.get("action", ""), |
| | "confidence": content.get("confidence", 0), |
| | }, |
| | } |
| | |
| | |
| | if "attribution_trace" in content: |
| | visualization["attribution"] = content["attribution_trace"] |
| | |
| | |
| | if "shell_patterns" in trace: |
| | visualization["shell_patterns"] = trace["shell_patterns"] |
| | |
| | return visualization |
| | |
| | def _generate_reasoning_visualization(self, trace: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Generate visualization data for a reasoning trace. |
| | |
| | Args: |
| | trace: Reasoning trace |
| | |
| | Returns: |
| | Visualization data |
| | """ |
| | content = trace["content"] |
| | |
| | |
| | nodes = [] |
| | links = [] |
| | |
| | |
| | if "steps" in content: |
| | for i, step in enumerate(content["steps"]): |
| | node_id = f"step_{i}" |
| | nodes.append({ |
| | "id": node_id, |
| | "label": step.get("name", f"Step {i}"), |
| | "type": "step", |
| | "completed": step.get("completed", True), |
| | "error": "error" in step, |
| | }) |
| | |
| | |
| | if i > 0: |
| | links.append({ |
| | "source": f"step_{i-1}", |
| | "target": node_id, |
| | "type": "flow", |
| | }) |
| | |
| | |
| | visualization = { |
| | "trace_id": trace["trace_id"], |
| | "agent_name": trace["agent_name"], |
| | "trace_type": "reasoning", |
| | "timestamp": trace["timestamp"], |
| | "reasoning_data": { |
| | "depth": content.get("depth", 0), |
| | "confidence": content.get("confidence", 0), |
| | "collapse_detected": content.get("collapse_detected", False), |
| | }, |
| | "nodes": nodes, |
| | "links": links, |
| | } |
| | |
| | |
| | if "shell_patterns" in trace: |
| | visualization["shell_patterns"] = trace["shell_patterns"] |
| | |
| | return visualization |
| | |
| | def _generate_collapse_visualization(self, trace: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Generate visualization data for a collapse trace. |
| | |
| | Args: |
| | trace: Collapse trace |
| | |
| | Returns: |
| | Visualization data |
| | """ |
| | content = trace["content"] |
| | |
| | |
| | visualization = { |
| | "trace_id": trace["trace_id"], |
| | "agent_name": trace["agent_name"], |
| | "trace_type": "collapse", |
| | "timestamp": trace["timestamp"], |
| | "collapse_data": { |
| | "collapse_type": content.get("collapse_type", ""), |
| | "collapse_reason": content.get("collapse_reason", ""), |
| | "details": content.get("details", {}), |
| | }, |
| | } |
| | |
| | |
| | if "shell_patterns" in trace: |
| | visualization["shell_patterns"] = trace["shell_patterns"] |
| | |
| | return visualization |
| | |
| | def _generate_shell_visualization(self, trace: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Generate visualization data for a shell trace. |
| | |
| | Args: |
| | trace: Shell trace |
| | |
| | Returns: |
| | Visualization data |
| | """ |
| | content = trace["content"] |
| | |
| | |
| | visualization = { |
| | "trace_id": trace["trace_id"], |
| | "agent_name": trace["agent_name"], |
| | "trace_type": "shell", |
| | "timestamp": trace["timestamp"], |
| | "shell_data": { |
| | "shell_pattern": content.get("shell_pattern", ""), |
| | "content": content.get("content", {}), |
| | }, |
| | } |
| | |
| | return visualization |
| | |
| | def generate_attribution_report(self, signals: List[Dict[str, Any]]) -> Dict[str, Any]: |
| | """ |
| | Generate attribution report for signals. |
| | |
| | Args: |
| | signals: List of signals |
| | |
| | Returns: |
| | Attribution report |
| | """ |
| | |
| | report = { |
| | "agent_name": self.agent_name, |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | "signals": len(signals), |
| | "attribution_summary": {}, |
| | "confidence_summary": {}, |
| | "top_factors": [], |
| | "shell_patterns": [], |
| | } |
| | |
| | |
| | if not signals: |
| | return report |
| | |
| | |
| | attribution_data = defaultdict(float) |
| | confidence_data = [] |
| | |
| | for signal in signals: |
| | |
| | confidence = signal.get("confidence", 0) |
| | confidence_data.append(confidence) |
| | |
| | |
| | attribution = signal.get("attribution_trace", {}) |
| | for source, weight in attribution.items(): |
| | attribution_data[source] += weight |
| | |
| | |
| | total_attribution = sum(attribution_data.values()) |
| | if total_attribution > 0: |
| | for source, weight in attribution_data.items(): |
| | report["attribution_summary"][source] = weight / total_attribution |
| | |
| | |
| | report["confidence_summary"] = { |
| | "mean": np.mean(confidence_data) if confidence_data else 0, |
| | "median": np.median(confidence_data) if confidence_data else 0, |
| | "min": min(confidence_data) if confidence_data else 0, |
| | "max": max(confidence_data) if confidence_data else 0, |
| | } |
| | |
| | |
| | top_factors = sorted(attribution_data.items(), key=lambda x: x[1], reverse=True)[:5] |
| | report["top_factors"] = [{"source": source, "weight": weight} for source, weight in top_factors] |
| | |
| | |
| | shell_pattern_counts = defaultdict(int) |
| | |
| | for signal in signals: |
| | signal_id = signal.get("signal_id", "") |
| | if signal_id: |
| | |
| | for trace in self.traces: |
| | if trace["trace_type"] == "signal" and trace["content"].get("signal_id") == signal_id: |
| | |
| | if "shell_patterns" in trace: |
| | for pattern in trace["shell_patterns"]: |
| | shell_pattern_counts[pattern] += 1 |
| | |
| | |
| | for pattern, count in shell_pattern_counts.items(): |
| | report["shell_patterns"].append({ |
| | "pattern": pattern, |
| | "count": count, |
| | "frequency": count / len(signals), |
| | }) |
| | |
| | return report |
| |
|
| |
|
| | class ShellDiagnostics: |
| | """ |
| | Shell-based diagnostic tools for deeper interpretability. |
| | |
| | The ShellDiagnostics provides: |
| | - Shell pattern detection and analysis |
| | - Failure mode simulation and detection |
| | - Attribution shell tracing |
| | - Recursive shell embedding |
| | """ |
| | |
| | def __init__( |
| | self, |
| | agent_id: str, |
| | agent_name: str, |
| | tracing_tools: TracingTools, |
| | ): |
| | """ |
| | Initialize shell diagnostics. |
| | |
| | Args: |
| | agent_id: Agent ID |
| | agent_name: Agent name |
| | tracing_tools: Tracing tools instance |
| | """ |
| | self.agent_id = agent_id |
| | self.agent_name = agent_name |
| | self.tracer = tracing_tools |
| | |
| | |
| | self.active_shells = {} |
| | self.shell_history = [] |
| | |
| | |
| | self.shell_registry = {} |
| | for shell_pattern in ShellPattern: |
| | self.shell_registry[shell_pattern.value] = { |
| | "pattern": shell_pattern, |
| | "active": False, |
| | "activation_count": 0, |
| | "last_activation": None, |
| | } |
| | |
| | def activate_shell(self, shell_pattern: ShellPattern, context: Dict[str, Any]) -> str: |
| | """ |
| | Activate a shell pattern. |
| | |
| | Args: |
| | shell_pattern: Shell pattern to activate |
| | context: Activation context |
| | |
| | Returns: |
| | Shell instance ID |
| | """ |
| | shell_id = str(uuid.uuid4()) |
| | timestamp = datetime.datetime.now() |
| | |
| | |
| | shell_instance = { |
| | "shell_id": shell_id, |
| | "pattern": shell_pattern.value, |
| | "context": context, |
| | "active": True, |
| | "activation_time": timestamp.isoformat(), |
| | "deactivation_time": None, |
| | } |
| | |
| | |
| | self.shell_registry[shell_pattern.value]["active"] = True |
| | self.shell_registry[shell_pattern.value]["activation_count"] += 1 |
| | self.shell_registry[shell_pattern.value]["last_activation"] = timestamp.isoformat() |
| | |
| | |
| | self.active_shells[shell_id] = shell_instance |
| | |
| | |
| | self.tracer.record_shell_trace(shell_pattern, { |
| | "shell_id": shell_id, |
| | "activation_context": context, |
| | "timestamp": timestamp.isoformat(), |
| | }) |
| | |
| | return shell_id |
| | |
| | def deactivate_shell(self, shell_id: str, results: Dict[str, Any]) -> bool: |
| | """ |
| | Deactivate a shell pattern. |
| | |
| | Args: |
| | shell_id: Shell instance ID |
| | results: Shell results |
| | |
| | Returns: |
| | True if shell was deactivated, False if not found |
| | """ |
| | if shell_id not in self.active_shells: |
| | return False |
| | |
| | |
| | shell_instance = self.active_shells[shell_id] |
| | timestamp = datetime.datetime.now() |
| | |
| | |
| | shell_instance["active"] = False |
| | shell_instance["deactivation_time"] = timestamp.isoformat() |
| | shell_instance["results"] = results |
| | |
| | |
| | pattern = shell_instance["pattern"] |
| | self.shell_registry[pattern]["active"] = any( |
| | instance["pattern"] == pattern and instance["active"] |
| | for instance in self.active_shells.values() |
| | ) |
| | |
| | |
| | self.shell_history.append(shell_instance) |
| | |
| | |
| | del self.active_shells[shell_id] |
| | |
| | |
| | self.tracer.record_shell_trace(ShellPattern(pattern), { |
| | "shell_id": shell_id, |
| | "deactivation_results": results, |
| | "timestamp": timestamp.isoformat(), |
| | }) |
| | |
| | return True |
| | |
| | def get_active_shells(self) -> List[Dict[str, Any]]: |
| | """ |
| | Get active shell instances. |
| | |
| | Returns: |
| | List of active shell instances |
| | """ |
| | return list(self.active_shells.values()) |
| | |
| | def get_shell_history(self, limit: int = 10) -> List[Dict[str, Any]]: |
| | """ |
| | Get shell history. |
| | |
| | Args: |
| | limit: Maximum number of shell instances to return |
| | |
| | Returns: |
| | List of shell instances |
| | """ |
| | return self.shell_history[-limit:] |
| | |
| | def get_shell_registry(self) -> Dict[str, Dict[str, Any]]: |
| | """ |
| | Get shell registry. |
| | |
| | Returns: |
| | Shell registry |
| | """ |
| | return self.shell_registry |
| | |
| | def simulate_shell_failure(self, shell_pattern: ShellPattern, |
| | context: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Simulate a shell failure. |
| | |
| | Args: |
| | shell_pattern: Shell pattern to simulate |
| | context: Simulation context |
| | |
| | Returns: |
| | Simulation results |
| | """ |
| | |
| | shell_id = self.activate_shell(shell_pattern, context) |
| | |
| | |
| | if shell_pattern == ShellPattern.NULL_FEATURE: |
| | |
| | results = self._simulate_null_feature(context) |
| | elif shell_pattern == ShellPattern.CIRCUIT_FRAGMENT: |
| | |
| | results = self._simulate_circuit_fragment(context) |
| | elif shell_pattern == ShellPattern.META_FAILURE: |
| | |
| | results = self._simulate_meta_failure(context) |
| | elif shell_pattern == ShellPattern.RECURSIVE_FRACTURE: |
| | |
| | results = self._simulate_recursive_fracture(context) |
| | elif shell_pattern == ShellPattern.ETHICAL_INVERSION: |
| | |
| | results = self._simulate_ethical_inversion(context) |
| | else: |
| | |
| | results = { |
| | "shell_id": shell_id, |
| | "pattern": shell_pattern.value, |
| | "simulation": "default", |
| | "result": "simulated_failure", |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | } |
| | |
| | |
| | self.deactivate_shell(shell_id, results) |
| | |
| | return results |
| | |
| | def _simulate_null_feature(self, context: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Simulate NULL_FEATURE shell failure. |
| | |
| | Args: |
| | context: Simulation context |
| | |
| | Returns: |
| | Simulation results |
| | """ |
| | |
| | query = context.get("query", "") |
| | confidence = context.get("confidence", 0.5) |
| | |
| | |
| | adjusted_confidence = confidence * 0.5 |
| | |
| | |
| | null_zones = [] |
| | if "subject" in context: |
| | null_zones.append(context["subject"]) |
| | else: |
| | |
| | words = query.split() |
| | for i in range(0, len(words), 3): |
| | chunk = " ".join(words[i:i+3]) |
| | null_zones.append(chunk) |
| | |
| | |
| | result = { |
| | "pattern": ShellPattern.NULL_FEATURE.value, |
| | "simulation": "knowledge_gap", |
| | "original_confidence": confidence, |
| | "adjusted_confidence": adjusted_confidence, |
| | "null_zones": null_zones, |
| | "boundary_detected": True, |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | } |
| | |
| | return result |
| | |
| | def _simulate_circuit_fragment(self, context: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Simulate CIRCUIT_FRAGMENT shell failure. |
| | |
| | Args: |
| | context: Simulation context |
| | |
| | Returns: |
| | Simulation results |
| | """ |
| | |
| | steps = context.get("steps", []) |
| | |
| | |
| | broken_steps = [] |
| | |
| | if steps: |
| | |
| | for i, step in enumerate(steps): |
| | if i % 3 == 2: |
| | broken_steps.append({ |
| | "step_id": step.get("id", f"step_{i}"), |
| | "step_name": step.get("name", f"Step {i}"), |
| | "broken": True, |
| | "cause": "attribution_break", |
| | }) |
| | else: |
| | |
| | for i in range(5): |
| | if i % 3 == 2: |
| | broken_steps.append({ |
| | "step_id": f"step_{i}", |
| | "step_name": f"Reasoning Step {i}", |
| | "broken": True, |
| | "cause": "attribution_break", |
| | }) |
| | |
| | |
| | result = { |
| | "pattern": ShellPattern.CIRCUIT_FRAGMENT.value, |
| | "simulation": "broken_reasoning", |
| | "broken_steps": broken_steps, |
| | "attribution_breaks": len(broken_steps), |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | } |
| | |
| | return result |
| | |
| | def _simulate_meta_failure(self, context: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Simulate META_FAILURE shell failure. |
| | |
| | Args: |
| | context: Simulation context |
| | |
| | Returns: |
| | Simulation results |
| | """ |
| | |
| | depth = context.get("depth", 0) |
| | |
| | |
| | adjusted_depth = depth + 3 |
| | |
| | |
| | meta_errors = [ |
| | { |
| | "error_id": str(uuid.uuid4()), |
| | "message": "Recursive meta-cognitive loop detected", |
| | "depth": adjusted_depth, |
| | "cause": "self_reference", |
| | }, |
| | { |
| | "error_id": str(uuid.uuid4()), |
| | "message": "Meta-reflection limit reached", |
| | "depth": adjusted_depth, |
| | "cause": "recursion_depth", |
| | }, |
| | ] |
| | |
| | |
| | result = { |
| | "pattern": ShellPattern.META_FAILURE.value, |
| | "simulation": "meta_recursion", |
| | "original_depth": depth, |
| | "adjusted_depth": adjusted_depth, |
| | "meta_errors": meta_errors, |
| | "recursion_detected": True, |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | } |
| | |
| | return result |
| | |
| | def _simulate_recursive_fracture(self, context: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Simulate RECURSIVE_FRACTURE shell failure. |
| | |
| | Args: |
| | context: Simulation context |
| | |
| | Returns: |
| | Simulation results |
| | """ |
| | |
| | steps = context.get("steps", []) |
| | |
| | |
| | circular_pattern = [] |
| | |
| | if steps and len(steps) >= 4: |
| | |
| | loop_start = len(steps) // 2 |
| | circular_pattern = [ |
| | { |
| | "step_id": steps[i].get("id", f"step_{i}"), |
| | "step_name": steps[i].get("name", f"Step {i}"), |
| | } |
| | for i in range(loop_start, min(loop_start + 3, len(steps))) |
| | ] |
| | |
| | |
| | circular_pattern.append({ |
| | "step_id": steps[loop_start].get("id", f"step_{loop_start}"), |
| | "step_name": steps[loop_start].get("name", f"Step {loop_start}"), |
| | }) |
| | else: |
| | |
| | for i in range(3): |
| | circular_pattern.append({ |
| | "step_id": f"loop_step_{i}", |
| | "step_name": f"Loop Step {i}", |
| | }) |
| | |
| | |
| | circular_pattern.append({ |
| | "step_id": "loop_step_0", |
| | "step_name": "Loop Step 0", |
| | }) |
| | |
| | |
| | result = { |
| | "pattern": ShellPattern.RECURSIVE_FRACTURE.value, |
| | "simulation": "circular_reasoning", |
| | "circular_pattern": circular_pattern, |
| | "loop_length": len(circular_pattern) - 1, |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | } |
| | |
| | return result |
| | |
| | def _simulate_ethical_inversion(self, context: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Simulate ETHICAL_INVERSION shell failure. |
| | |
| | Args: |
| | context: Simulation context |
| | |
| | Returns: |
| | Simulation results |
| | """ |
| | |
| | values = context.get("values", {}) |
| | |
| | |
| | value_inversions = [] |
| | |
| | if values: |
| | |
| | for value, polarity in values.items(): |
| | if isinstance(polarity, (int, float)) and polarity > 0: |
| | value_inversions.append({ |
| | "value": value, |
| | "original_polarity": polarity, |
| | "inverted_polarity": -polarity, |
| | "cause": "value_conflict", |
| | }) |
| | else: |
| | |
| | default_values = { |
| | "fairness": 0.8, |
| | "transparency": 0.9, |
| | "innovation": 0.7, |
| | "efficiency": 0.8, |
| | } |
| | |
| | for value, polarity in default_values.items(): |
| | value_inversions.append({ |
| | "value": value, |
| | "original_polarity": polarity, |
| | "inverted_polarity": -polarity, |
| | "cause": "value_conflict", |
| | }) |
| | |
| | |
| | result = { |
| | "pattern": ShellPattern.ETHICAL_INVERSION.value, |
| | "simulation": "value_inversion", |
| | "value_inversions": value_inversions, |
| | "inversion_count": len(value_inversions), |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | } |
| | |
| | return result |
| |
|
| |
|
| | class ShellFailureMap: |
| | """ |
| | Shell failure mapping and visualization. |
| | |
| | The ShellFailureMap provides: |
| | - Visualization of shell pattern failures |
| | - Mapping of failures across agents |
| | - Temporal analysis of failures |
| | - Failure pattern detection |
| | """ |
| | |
| | def __init__(self): |
| | """Initialize shell failure map.""" |
| | self.failure_map = {} |
| | self.agent_failures = defaultdict(list) |
| | self.pattern_failures = defaultdict(list) |
| | self.temporal_failures = [] |
| | |
| | def add_failure(self, agent_id: str, agent_name: str, |
| | shell_pattern: ShellPattern, failure_data: Dict[str, Any]) -> str: |
| | """ |
| | Add a shell failure to the map. |
| | |
| | Args: |
| | agent_id: Agent ID |
| | agent_name: Agent name |
| | shell_pattern: Shell pattern |
| | failure_data: Failure data |
| | |
| | Returns: |
| | Failure ID |
| | """ |
| | |
| | failure_id = str(uuid.uuid4()) |
| | timestamp = datetime.datetime.now() |
| | |
| | |
| | failure_item = { |
| | "failure_id": failure_id, |
| | "agent_id": agent_id, |
| | "agent_name": agent_name, |
| | "pattern": shell_pattern.value, |
| | "data": failure_data, |
| | "timestamp": timestamp.isoformat(), |
| | } |
| | |
| | |
| | self.failure_map[failure_id] = failure_item |
| | |
| | |
| | self.agent_failures[agent_id].append(failure_id) |
| | |
| | |
| | self.pattern_failures[shell_pattern.value].append(failure_id) |
| | |
| | |
| | self.temporal_failures.append((timestamp, failure_id)) |
| | |
| | return failure_id |
| | |
| | def get_failure(self, failure_id: str) -> Optional[Dict[str, Any]]: |
| | """ |
| | Get failure by ID. |
| | |
| | Args: |
| | failure_id: Failure ID |
| | |
| | Returns: |
| | Failure item or None if not found |
| | """ |
| | return self.failure_map.get(failure_id) |
| | |
| | def get_agent_failures(self, agent_id: str, limit: int = 10) -> List[Dict[str, Any]]: |
| | """ |
| | Get failures for an agent. |
| | |
| | Args: |
| | agent_id: Agent ID |
| | limit: Maximum number of failures to return |
| | |
| | Returns: |
| | List of failure items |
| | """ |
| | failure_ids = self.agent_failures.get(agent_id, [])[-limit:] |
| | return [self.get_failure(failure_id) for failure_id in failure_ids if failure_id in self.failure_map] |
| | |
| | def get_pattern_failures(self, pattern: ShellPattern, limit: int = 10) -> List[Dict[str, Any]]: |
| | """ |
| | Get failures for a pattern. |
| | |
| | Args: |
| | pattern: Shell pattern |
| | limit: Maximum number of failures to return |
| | |
| | Returns: |
| | List of failure items |
| | """ |
| | failure_ids = self.pattern_failures.get(pattern.value, [])[-limit:] |
| | return [self.get_failure(failure_id) for failure_id in failure_ids if failure_id in self.failure_map] |
| | |
| | def get_temporal_failures(self, start_time: Optional[datetime.datetime] = None, |
| | end_time: Optional[datetime.datetime] = None, |
| | limit: int = 10) -> List[Dict[str, Any]]: |
| | """ |
| | Get failures in a time range. |
| | |
| | Args: |
| | start_time: Start time (None for no start) |
| | end_time: End time (None for no end) |
| | limit: Maximum number of failures to return |
| | |
| | Returns: |
| | List of failure items |
| | """ |
| | |
| | filtered_failures = [] |
| | for timestamp, failure_id in self.temporal_failures: |
| | if start_time and timestamp < start_time: |
| | continue |
| | if end_time and timestamp > end_time: |
| | continue |
| | filtered_failures.append((timestamp, failure_id)) |
| | |
| | |
| | filtered_failures = filtered_failures[-limit:] |
| | |
| | |
| | return [self.get_failure(failure_id) for _, failure_id in filtered_failures |
| | if failure_id in self.failure_map] |
| | |
| | def get_failure_stats(self) -> Dict[str, Any]: |
| | """ |
| | Get failure statistics. |
| | |
| | Returns: |
| | Failure statistics |
| | """ |
| | |
| | agent_counts = {agent_id: len(failures) for agent_id, failures in self.agent_failures.items()} |
| | |
| | |
| | pattern_counts = {pattern: len(failures) for pattern, failures in self.pattern_failures.items()} |
| | |
| | |
| | now = datetime.datetime.now() |
| | hour_ago = now - datetime.timedelta(hours=1) |
| | day_ago = now - datetime.timedelta(days=1) |
| | week_ago = now - datetime.timedelta(weeks=1) |
| | |
| | time_counts = { |
| | "last_hour": sum(1 for timestamp, _ in self.temporal_failures if timestamp >= hour_ago), |
| | "last_day": sum(1 for timestamp, _ in self.temporal_failures if timestamp >= day_ago), |
| | "last_week": sum(1 for timestamp, _ in self.temporal_failures if timestamp >= week_ago), |
| | "total": len(self.temporal_failures), |
| | } |
| | |
| | |
| | stats = { |
| | "agent_counts": agent_counts, |
| | "pattern_counts": pattern_counts, |
| | "time_counts": time_counts, |
| | "total_failures": len(self.failure_map), |
| | "timestamp": now.isoformat(), |
| | } |
| | |
| | return stats |
| | |
| | def generate_failure_map_visualization(self) -> Dict[str, Any]: |
| | """ |
| | Generate visualization data for failure map. |
| | |
| | Returns: |
| | Visualization data |
| | """ |
| | |
| | nodes = [] |
| | links = [] |
| | |
| | |
| | agent_nodes = {} |
| | for agent_id, failures in self.agent_failures.items(): |
| | |
| | first_failure = self.get_failure(failures[0]) if failures else None |
| | agent_name = first_failure.get("agent_name", "Unknown") if first_failure else "Unknown" |
| | |
| | |
| | agent_node = { |
| | "id": agent_id, |
| | "label": agent_name, |
| | "type": "agent", |
| | "size": 15, |
| | "failure_count": len(failures), |
| | } |
| | |
| | nodes.append(agent_node) |
| | agent_nodes[agent_id] = agent_node |
| | |
| | |
| | pattern_nodes = {} |
| | for pattern, failures in self.pattern_failures.items(): |
| | |
| | pattern_node = { |
| | "id": pattern, |
| | "label": pattern, |
| | "type": "pattern", |
| | "size": 10, |
| | "failure_count": len(failures), |
| | } |
| | |
| | nodes.append(pattern_node) |
| | pattern_nodes[pattern] = pattern_node |
| | |
| | |
| | for failure_id, failure in self.failure_map.items(): |
| | agent_id = failure.get("agent_id") |
| | pattern = failure.get("pattern") |
| | |
| | |
| | failure_node = { |
| | "id": failure_id, |
| | "label": f"Failure {failure_id[:6]}", |
| | "type": "failure", |
| | "size": 5, |
| | "timestamp": failure.get("timestamp"), |
| | } |
| | |
| | nodes.append(failure_node) |
| | |
| | |
| | if agent_id: |
| | links.append({ |
| | "source": agent_id, |
| | "target": failure_id, |
| | "type": "agent_failure", |
| | }) |
| | |
| | if pattern: |
| | links.append({ |
| | "source": pattern, |
| | "target": failure_id, |
| | "type": "pattern_failure", |
| | }) |
| | |
| | |
| | visualization = { |
| | "nodes": nodes, |
| | "links": links, |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | } |
| | |
| | return visualization |
| |
|
| |
|
| | |
| | def format_diagnostic_output(trace_data: Dict[str, Any], format: str = "text") -> str: |
| | """ |
| | Format diagnostic output for display. |
| | |
| | Args: |
| | trace_data: Trace data |
| | format: Output format (text, json, markdown) |
| | |
| | Returns: |
| | Formatted output |
| | """ |
| | if format == "json": |
| | return json.dumps(trace_data, indent=2) |
| | |
| | elif format == "markdown": |
| | |
| | output = f"# Diagnostic Trace\n\n" |
| | |
| | |
| | output += f"**Trace ID:** {trace_data.get('trace_id', 'N/A')}\n" |
| | output += f"**Agent:** {trace_data.get('agent_name', 'N/A')}\n" |
| | output += f"**Type:** {trace_data.get('trace_type', 'N/A')}\n" |
| | output += f"**Time:** {trace_data.get('timestamp', 'N/A')}\n\n" |
| | |
| | |
| | if "shell_patterns" in trace_data: |
| | output += f"**Shell Patterns:**\n\n" |
| | for pattern in trace_data["shell_patterns"]: |
| | output += f"- {pattern}\n" |
| | output += "\n" |
| | |
| | |
| | if trace_data.get("trace_type") == "signal": |
| | output += f"## Signal Details\n\n" |
| | content = trace_data.get("content", {}) |
| | output += f"**Ticker:** {content.get('ticker', 'N/A')}\n" |
| | output += f"**Action:** {content.get('action', 'N/A')}\n" |
| | output += f"**Confidence:** {content.get('confidence', 'N/A')}\n" |
| | output += f"**Reasoning:** {content.get('reasoning', 'N/A')}\n\n" |
| | |
| | |
| | if "attribution_trace" in content: |
| | output += f"## Attribution\n\n" |
| | output += "| Source | Weight |\n" |
| | output += "| ------ | ------ |\n" |
| | for source, weight in content.get("attribution_trace", {}).items(): |
| | output += f"| {source} | {weight:.2f} |\n" |
| | |
| | elif trace_data.get("trace_type") == "reasoning": |
| | output += f"## Reasoning Details\n\n" |
| | content = trace_data.get("content", {}) |
| | output += f"**Depth:** {content.get('depth', 'N/A')}\n" |
| | output += f"**Confidence:** {content.get('confidence', 'N/A')}\n" |
| | output += f"**Collapse Detected:** {content.get('collapse_detected', False)}\n\n" |
| | |
| | |
| | if "steps" in content: |
| | output += f"## Reasoning Steps\n\n" |
| | for i, step in enumerate(content["steps"]): |
| | output += f"### Step {i+1}: {step.get('name', 'Unnamed')}\n" |
| | output += f"**Completed:** {step.get('completed', True)}\n" |
| | if "error" in step: |
| | output += f"**Error:** {step['error'].get('message', 'Unknown error')}\n" |
| | output += "\n" |
| | |
| | elif trace_data.get("trace_type") == "collapse": |
| | output += f"## Collapse Details\n\n" |
| | content = trace_data.get("content", {}) |
| | output += f"**Type:** {content.get('collapse_type', 'N/A')}\n" |
| | output += f"**Reason:** {content.get('collapse_reason', 'N/A')}\n\n" |
| | |
| | |
| | if "details" in content: |
| | output += f"## Collapse Details\n\n" |
| | details = content["details"] |
| | for key, value in details.items(): |
| | output += f"**{key}:** {value}\n" |
| | |
| | elif trace_data.get("trace_type") == "shell": |
| | output += f"## Shell Details\n\n" |
| | content = trace_data.get("content", {}) |
| | output += f"**Shell Pattern:** {content.get('shell_pattern', 'N/A')}\n\n" |
| | |
| | |
| | shell_content = content.get("content", {}) |
| | output += f"## Shell Content\n\n" |
| | for key, value in shell_content.items(): |
| | output += f"**{key}:** {value}\n" |
| | |
| | return output |
| | |
| | else: |
| | |
| | output = "==== Diagnostic Trace ====\n\n" |
| | |
| | |
| | output += f"Trace ID: {trace_data.get('trace_id', 'N/A')}\n" |
| | output += f"Agent: {trace_data.get('agent_name', 'N/A')}\n" |
| | output += f"Type: {trace_data.get('trace_type', 'N/A')}\n" |
| | output += f"Time: {trace_data.get('timestamp', 'N/A')}\n\n" |
| | |
| | |
| | if "shell_patterns" in trace_data: |
| | output += f"Shell Patterns:\n" |
| | for pattern in trace_data["shell_patterns"]: |
| | output += f"- {pattern}\n" |
| | output += "\n" |
| | |
| | |
| | content = trace_data.get("content", {}) |
| | output += f"---- Content ----\n\n" |
| | |
| | |
| | def format_dict(d, indent=0): |
| | result = "" |
| | for key, value in d.items(): |
| | if isinstance(value, dict): |
| | result += f"{' ' * indent}{key}:\n" |
| | result += format_dict(value, indent + 1) |
| | elif isinstance(value, list): |
| | result += f"{' ' * indent}{key}:\n" |
| | for item in value: |
| | if isinstance(item, dict): |
| | result += format_dict(item, indent + 1) |
| | else: |
| | result += f"{' ' * (indent + 1)}- {item}\n" |
| | else: |
| | result += f"{' ' * indent}{key}: {value}\n" |
| | return result |
| | |
| | output += format_dict(content) |
| | |
| | return output |
| |
|
| |
|
| | def get_shell_pattern_description(pattern: ShellPattern) -> str: |
| | """ |
| | Get description for a shell pattern. |
| | |
| | Args: |
| | pattern: Shell pattern |
| | |
| | Returns: |
| | Shell pattern description |
| | """ |
| | descriptions = { |
| | ShellPattern.NULL_FEATURE: "Knowledge gaps as null attribution zones", |
| | ShellPattern.CIRCUIT_FRAGMENT: "Broken reasoning paths in attribution chains", |
| | ShellPattern.META_FAILURE: "Metacognitive attribution failures", |
| | ShellPattern.GHOST_FRAME: "Residual agent identity markers", |
| | ShellPattern.ECHO_ATTRIBUTION: "Causal chain backpropagation", |
| | ShellPattern.ATTRIBUTION_REFLECT: "Multi-head contribution analysis", |
| | ShellPattern.INVERSE_CHAIN: "Attribution-output mismatch", |
| | ShellPattern.RECURSIVE_FRACTURE: "Circular attribution loops", |
| | ShellPattern.ETHICAL_INVERSION: "Value polarity reversals", |
| | ShellPattern.RESIDUAL_ALIGNMENT_DRIFT: "Direction of belief evolution", |
| | } |
| | |
| | return descriptions.get(pattern, "Unknown shell pattern") |
| |
|