"""Profiling data analysis and visualization data generation. This module processes raw profiling records to build call trees, compute statistics, and generate data structures for visualization. """ import logging from typing import List, Dict, Optional, Any from collections import defaultdict from dataclasses import dataclass, field from .protocol import ProfileRecord from .symbolizer import Symbolizer logger = logging.getLogger(__name__) @dataclass class FunctionStats: """Statistics for a single function.""" name: str address: int call_count: int = 0 total_time_us: int = 0 min_time_us: int = float('inf') max_time_us: int = 0 self_time_us: int = 0 # Time excluding children def update(self, duration_us: int): """Update statistics with a new duration measurement.""" self.call_count += 1 self.total_time_us += duration_us self.min_time_us = min(self.min_time_us, duration_us) self.max_time_us = max(self.max_time_us, duration_us) @property def avg_time_us(self) -> float: """Average execution time in microseconds.""" return self.total_time_us / self.call_count if self.call_count > 0 else 0 @dataclass class CallTreeNode: """Node in the call tree.""" name: str address: int entry_time: int duration_us: int depth: int children: List['CallTreeNode'] = field(default_factory=list) def add_child(self, node: 'CallTreeNode'): """Add a child node to this node.""" self.children.append(node) def to_flamegraph_dict(self) -> Dict[str, Any]: """Convert to d3-flame-graph format. Returns: Dictionary in the format: { "name": "function_name", "value": duration_in_microseconds, "children": [child_dicts...] } """ result = { "name": self.name, "value": self.duration_us } if self.children: result["children"] = [child.to_flamegraph_dict() for child in self.children] return result def to_timeline_dict(self) -> Dict[str, Any]: """Convert to timeline/flame chart format. Returns: Dictionary with timing information for Plotly timeline """ return { "name": self.name, "start": self.entry_time, "duration": self.duration_us, "depth": self.depth, "children": [child.to_timeline_dict() for child in self.children] } class ProfileAnalyzer: """Analyzes profiling data and generates visualization data.""" def __init__(self, symbolizer: Optional[Symbolizer] = None): """Initialize the analyzer. Args: symbolizer: Symbolizer for resolving addresses to names """ self.symbolizer = symbolizer self.records: List[ProfileRecord] = [] self.stats: Dict[int, FunctionStats] = {} # addr -> stats self.call_tree: Optional[CallTreeNode] = None self.timeline_events: List[Dict[str, Any]] = [] def add_records(self, records: List[ProfileRecord]): """Add profiling records for analysis. Args: records: List of ProfileRecord objects """ self.records.extend(records) logger.debug(f"Added {len(records)} records, total: {len(self.records)}") def clear(self): """Clear all recorded data.""" self.records.clear() self.stats.clear() self.call_tree = None self.timeline_events.clear() logger.info("Cleared all profiling data") def _resolve_name(self, addr: int) -> str: """Resolve address to function name.""" if self.symbolizer: return self.symbolizer.resolve_name(addr) return f"func_0x{addr:08x}" def compute_statistics(self) -> Dict[int, FunctionStats]: """Compute statistics for all functions. Returns: Dictionary mapping addresses to FunctionStats """ self.stats.clear() for record in self.records: addr = record.func_addr name = self._resolve_name(addr) if addr not in self.stats: self.stats[addr] = FunctionStats(name=name, address=addr) self.stats[addr].update(record.duration_us) logger.info(f"Computed statistics for {len(self.stats)} functions") return self.stats def build_call_tree(self) -> Optional[CallTreeNode]: """Build call tree from profiling records. The call tree is built using the depth field to determine parent-child relationships. Returns: Root node of the call tree, or None if no records """ if not self.records: return None # Sort records by entry time to process in chronological order sorted_records = sorted(self.records, key=lambda r: r.entry_time) # Stack to track current path in the tree # stack[depth] = node at that depth stack: List[CallTreeNode] = [] root = None for record in sorted_records: name = self._resolve_name(record.func_addr) node = CallTreeNode( name=name, address=record.func_addr, entry_time=record.entry_time, duration_us=record.duration_us, depth=record.depth ) # Adjust stack to current depth while len(stack) > record.depth: stack.pop() # Add node to tree if record.depth == 0: # Root level function if root is None: root = node stack = [root] else: # Multiple root-level functions - create synthetic root if not isinstance(root.name, str) or not root.name.startswith("__root__"): synthetic_root = CallTreeNode( name="__root__", address=0, entry_time=0, duration_us=0, depth=-1 ) synthetic_root.add_child(root) root = synthetic_root stack = [root] root.add_child(node) # Update root duration to encompass all children root.duration_us = max(root.duration_us, node.entry_time + node.duration_us) else: # Child function if len(stack) >= record.depth: parent = stack[record.depth - 1] parent.add_child(node) else: logger.warning(f"Orphan node at depth {record.depth}: {name}") continue # Push to stack if we're going deeper if len(stack) == record.depth: stack.append(node) elif len(stack) == record.depth + 1: stack[record.depth] = node self.call_tree = root logger.info(f"Built call tree with {len(sorted_records)} nodes") return root def to_flamegraph_json(self) -> Dict[str, Any]: """Generate flame graph data in d3-flame-graph format. Returns: Dictionary suitable for d3-flame-graph """ if self.call_tree is None: self.build_call_tree() if self.call_tree is None: return {"name": "root", "value": 0, "children": []} return self.call_tree.to_flamegraph_dict() def to_timeline_json(self) -> List[Dict[str, Any]]: """Generate timeline data for flame chart visualization. Returns: List of events for timeline/flame chart """ events = [] for record in sorted(self.records, key=lambda r: r.entry_time): name = self._resolve_name(record.func_addr) events.append({ "name": name, "start": record.entry_time, "end": record.entry_time + record.duration_us, "duration": record.duration_us, "depth": record.depth }) return events def to_statistics_json(self) -> List[Dict[str, Any]]: """Generate statistics table data. Returns: List of function statistics dictionaries """ if not self.stats: self.compute_statistics() stats_list = [] for func_stats in self.stats.values(): stats_list.append({ "name": func_stats.name, "address": f"0x{func_stats.address:08x}", "calls": func_stats.call_count, "total_us": func_stats.total_time_us, "avg_us": func_stats.avg_time_us, "min_us": func_stats.min_time_us, "max_us": func_stats.max_time_us, }) # Sort by total time (descending) stats_list.sort(key=lambda x: x["total_us"], reverse=True) return stats_list def get_summary(self) -> Dict[str, Any]: """Get summary statistics. Returns: Dictionary with summary information """ if not self.stats: self.compute_statistics() total_records = len(self.records) total_functions = len(self.stats) total_time = sum(s.total_time_us for s in self.stats.values()) hottest = None if self.stats: hottest = max(self.stats.values(), key=lambda s: s.total_time_us) return { "total_records": total_records, "total_functions": total_functions, "total_time_us": total_time, "hottest_function": hottest.name if hottest else None, "hottest_time_us": hottest.total_time_us if hottest else 0, }