"""Flask web server for MiniProfiler visualization. Provides a web interface for real-time profiling visualization including flame graphs, timelines, and statistics tables. """ import logging from flask import Flask, render_template, jsonify, request from flask_socketio import SocketIO, emit from typing import Optional import threading from .serial_reader import SerialReader from .analyzer import ProfileAnalyzer from .symbolizer import Symbolizer from .protocol import ProfileRecord, Metadata, StatusInfo logger = logging.getLogger(__name__) class ProfilerWebServer: """Web server for profiler visualization and control.""" def __init__(self, host: str = '0.0.0.0', port: int = 5000): """Initialize the web server. Args: host: Host address to bind to port: Port number to listen on """ self.host = host self.port = port # Initialize Flask app self.app = Flask(__name__, template_folder='../web/templates', static_folder='../web/static') self.app.config['SECRET_KEY'] = 'miniprofiler-secret-key' # Initialize SocketIO self.socketio = SocketIO(self.app, cors_allowed_origins="*") # Profiler components self.serial_reader: Optional[SerialReader] = None self.analyzer = ProfileAnalyzer() self.symbolizer: Optional[Symbolizer] = None self.metadata: Optional[Metadata] = None # State self.is_connected = False self.is_profiling = False # Setup routes self._setup_routes() self._setup_socketio_handlers() def _setup_routes(self): """Setup Flask HTTP routes.""" @self.app.route('/') def index(): """Main page.""" return render_template('index.html') @self.app.route('/api/status') def status(): """Get server status.""" return jsonify({ 'connected': self.is_connected, 'profiling': self.is_profiling, 'has_data': len(self.analyzer.records) > 0, 'record_count': len(self.analyzer.records) }) @self.app.route('/api/summary') def summary(): """Get profiling summary statistics.""" return jsonify(self.analyzer.get_summary()) @self.app.route('/api/flamegraph') def flamegraph(): """Get flame graph data.""" return jsonify(self.analyzer.to_flamegraph_json()) @self.app.route('/api/timeline') def timeline(): """Get timeline data.""" return jsonify(self.analyzer.to_timeline_json()) @self.app.route('/api/statistics') def statistics(): """Get statistics table data.""" return jsonify(self.analyzer.to_statistics_json()) @self.app.route('/api/sample/flamegraph') def sample_flamegraph(): """Get sample flame graph data.""" import os import json sample_file = os.path.join(os.path.dirname(__file__), '../tests/sample_flamegraph.json') if os.path.exists(sample_file): with open(sample_file, 'r') as f: return jsonify(json.load(f)) return jsonify({"error": "Sample data not found"}), 404 @self.app.route('/api/sample/statistics') def sample_statistics(): """Get sample statistics data.""" import os import json sample_file = os.path.join(os.path.dirname(__file__), '../tests/sample_statistics.json') if os.path.exists(sample_file): with open(sample_file, 'r') as f: return jsonify(json.load(f)) return jsonify({"error": "Sample data not found"}), 404 @self.app.route('/api/sample/timeline') def sample_timeline(): """Get sample timeline data.""" import os import json sample_file = os.path.join(os.path.dirname(__file__), '../tests/sample_timeline.json') if os.path.exists(sample_file): with open(sample_file, 'r') as f: return jsonify(json.load(f)) return jsonify({"error": "Sample data not found"}), 404 @self.app.route('/api/sample/generate') def generate_sample(): """Generate sample profiling data on-the-fly.""" import sys import os # Add tests directory to path to import sample data generator tests_dir = os.path.join(os.path.dirname(__file__), '../tests') if tests_dir not in sys.path: sys.path.insert(0, tests_dir) try: from sample_data_generator import generate_sample_profile_data, FUNCTIONS # Generate sample records records = generate_sample_profile_data(num_iterations=20) # Create temporary analyzer to process the data temp_analyzer = ProfileAnalyzer() temp_analyzer.add_records(records) # Generate all three visualization formats flamegraph_data = temp_analyzer.to_flamegraph_json() stats_data = temp_analyzer.to_statistics_json() timeline_data = temp_analyzer.to_timeline_json() # Add human-readable function names to flamegraph def add_names(node): """Recursively add function names to flamegraph nodes.""" if 'name' in node and node['name'].startswith('func_0x'): try: # Extract address from "func_0x08000100" format addr = int(node['name'][5:], 16) # Skip "func_" if addr in FUNCTIONS: node['name'] = FUNCTIONS[addr] except ValueError: pass if 'children' in node: for child in node['children']: add_names(child) add_names(flamegraph_data) # Add function names to statistics logger.debug(f"Processing {len(stats_data)} statistics entries") logger.debug(f"FUNCTIONS has {len(FUNCTIONS)} entries") for stat in stats_data: if stat['name'].startswith('func_0x'): try: addr = int(stat['name'][5:], 16) # Skip "func_" if addr in FUNCTIONS: old_name = stat['name'] stat['name'] = FUNCTIONS[addr] logger.debug(f"Replaced {old_name} with {stat['name']}") else: logger.debug(f"Address 0x{addr:08x} not in FUNCTIONS") except ValueError as e: logger.error(f"ValueError parsing {stat['name']}: {e}") # Add function names to timeline for event in timeline_data: if event['name'].startswith('func_0x'): try: addr = int(event['name'][5:], 16) # Skip "func_" if addr in FUNCTIONS: event['name'] = FUNCTIONS[addr] except ValueError: pass # Return combined response return jsonify({ 'flamegraph': flamegraph_data, 'statistics': stats_data, 'timeline': timeline_data }) except Exception as e: logger.error(f"Error generating sample data: {e}") return jsonify({"error": str(e)}), 500 def _setup_socketio_handlers(self): """Setup SocketIO event handlers.""" @self.socketio.on('connect') def handle_connect(): """Handle client connection.""" logger.info("Client connected") emit('status', { 'connected': self.is_connected, 'profiling': self.is_profiling }) @self.socketio.on('disconnect') def handle_disconnect(): """Handle client disconnection.""" logger.info("Client disconnected") @self.socketio.on('connect_serial') def handle_connect_serial(data): """Connect to serial port. Args: data: Dict with 'port' and optional 'baudrate' """ port = data.get('port') baudrate = data.get('baudrate', 115200) elf_path = data.get('elf_path', None) if not port: emit('error', {'message': 'No port specified'}) return try: # Load symbolizer if ELF path provided if elf_path: self.symbolizer = Symbolizer(elf_path) self.analyzer.symbolizer = self.symbolizer # Create serial reader self.serial_reader = SerialReader(port, baudrate) # Set up callbacks self.serial_reader.on_profile_data = self._on_profile_data self.serial_reader.on_metadata = self._on_metadata self.serial_reader.on_status = self._on_status self.serial_reader.on_error = self._on_error # Connect if self.serial_reader.connect(): self.serial_reader.start_reading() self.is_connected = True # Request metadata self.serial_reader.get_metadata() emit('connected', {'port': port, 'baudrate': baudrate}) logger.info(f"Connected to {port} at {baudrate} baud") else: emit('error', {'message': f'Failed to connect to {port}'}) except Exception as e: logger.error(f"Error connecting to serial: {e}") emit('error', {'message': str(e)}) @self.socketio.on('disconnect_serial') def handle_disconnect_serial(): """Disconnect from serial port.""" if self.serial_reader: self.serial_reader.stop_reading() self.serial_reader.disconnect() self.serial_reader = None self.is_connected = False self.is_profiling = False emit('disconnected', {}) logger.info("Disconnected from serial port") @self.socketio.on('start_profiling') def handle_start_profiling(): """Start profiling on the device.""" if not self.serial_reader or not self.is_connected: emit('error', {'message': 'Not connected to device'}) return if self.serial_reader.start_profiling(): self.is_profiling = True emit('profiling_started', {}) logger.info("Started profiling") else: emit('error', {'message': 'Failed to start profiling'}) @self.socketio.on('stop_profiling') def handle_stop_profiling(): """Stop profiling on the device.""" if not self.serial_reader or not self.is_connected: emit('error', {'message': 'Not connected to device'}) return if self.serial_reader.stop_profiling(): self.is_profiling = False emit('profiling_stopped', {}) logger.info("Stopped profiling") else: emit('error', {'message': 'Failed to stop profiling'}) @self.socketio.on('clear_data') def handle_clear_data(): """Clear all profiling data.""" self.analyzer.clear() emit('data_cleared', {}) self._emit_data_update() logger.info("Cleared profiling data") @self.socketio.on('reset_buffers') def handle_reset_buffers(): """Reset device buffers.""" if not self.serial_reader or not self.is_connected: emit('error', {'message': 'Not connected to device'}) return if self.serial_reader.reset_buffers(): emit('buffers_reset', {}) logger.info("Reset device buffers") else: emit('error', {'message': 'Failed to reset buffers'}) def _on_profile_data(self, records): """Callback for receiving profile data. Args: records: List of ProfileRecord objects """ logger.debug(f"Received {len(records)} profile records") self.analyzer.add_records(records) self._emit_data_update() def _on_metadata(self, metadata: Metadata): """Callback for receiving metadata. Args: metadata: Metadata object """ logger.info(f"Received metadata: {metadata.fw_version}, " f"MCU: {metadata.mcu_clock_hz / 1e6:.1f} MHz") self.metadata = metadata self.socketio.emit('metadata', { 'fw_version': metadata.fw_version, 'mcu_clock_hz': metadata.mcu_clock_hz, 'timer_freq': metadata.timer_freq, 'build_id': f"0x{metadata.elf_build_id:08X}" }) def _on_status(self, status: StatusInfo): """Callback for receiving status updates. Args: status: StatusInfo object """ logger.debug(f"Device status: profiling={status.is_profiling}, " f"records={status.records_captured}, " f"overflows={status.buffer_overflows}") self.socketio.emit('device_status', { 'is_profiling': status.is_profiling, 'records_captured': status.records_captured, 'buffer_overflows': status.buffer_overflows, 'buffer_usage': status.buffer_usage_percent }) def _on_error(self, error: Exception): """Callback for serial errors. Args: error: Exception that occurred """ logger.error(f"Serial error: {error}") self.socketio.emit('error', {'message': str(error)}) def _emit_data_update(self): """Emit updated profiling data to all clients.""" try: # Send summary summary = self.analyzer.get_summary() self.socketio.emit('summary_update', summary) # Send flamegraph data flamegraph_data = self.analyzer.to_flamegraph_json() self.socketio.emit('flamegraph_update', flamegraph_data) # Send statistics stats_data = self.analyzer.to_statistics_json() self.socketio.emit('statistics_update', stats_data) # Send timeline data (can be large, so only send periodically) if len(self.analyzer.records) % 50 == 0: # Every 50 records timeline_data = self.analyzer.to_timeline_json() self.socketio.emit('timeline_update', timeline_data) except Exception as e: logger.error(f"Error emitting data update: {e}") def run(self, debug: bool = False): """Run the web server. Args: debug: Enable debug mode """ logger.info(f"Starting web server on {self.host}:{self.port}") self.socketio.run(self.app, host=self.host, port=self.port, debug=debug) def create_app(host: str = '0.0.0.0', port: int = 5000) -> ProfilerWebServer: """Create and configure the profiler web server. Args: host: Host address port: Port number Returns: Configured ProfilerWebServer instance """ return ProfilerWebServer(host, port)