Files
MiniProfiler/host/miniprofiler/serial_reader.py
Atharva Sawant 852957a7de Initialized MiniProfiler project
- Contains the host code with a protocol implementation, data analyser and web-based visualiser
2025-11-27 20:34:41 +05:30

268 lines
8.7 KiB
Python

"""Serial communication module for MiniProfiler.
Handles UART communication with the embedded device, including
sending commands and receiving profiling data.
"""
import serial
import threading
import time
import logging
from typing import Callable, Optional, List
from queue import Queue
from .protocol import (
Command, CommandPacket, ResponsePacket, ResponseType,
ProfileRecord, Metadata, StatusInfo
)
logger = logging.getLogger(__name__)
class SerialReader:
"""Manages serial communication with the embedded profiling device."""
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 1.0):
"""Initialize serial reader.
Args:
port: Serial port name (e.g., '/dev/ttyUSB0', 'COM3')
baudrate: Baud rate (default: 115200)
timeout: Read timeout in seconds
"""
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.serial: Optional[serial.Serial] = None
self.running = False
self.read_thread: Optional[threading.Thread] = None
self.command_queue = Queue()
# Callbacks
self.on_profile_data: Optional[Callable[[List[ProfileRecord]], None]] = None
self.on_metadata: Optional[Callable[[Metadata], None]] = None
self.on_status: Optional[Callable[[StatusInfo], None]] = None
self.on_error: Optional[Callable[[Exception], None]] = None
def connect(self) -> bool:
"""Open the serial port connection.
Returns:
True if connection successful, False otherwise
"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=self.timeout,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE
)
logger.info(f"Connected to {self.port} at {self.baudrate} baud")
return True
except serial.SerialException as e:
logger.error(f"Failed to connect to {self.port}: {e}")
if self.on_error:
self.on_error(e)
return False
def disconnect(self):
"""Close the serial port connection."""
if self.serial and self.serial.is_open:
self.serial.close()
logger.info(f"Disconnected from {self.port}")
def send_command(self, cmd: Command, payload: bytes = b'') -> bool:
"""Send a command to the embedded device.
Args:
cmd: Command to send
payload: Optional command payload
Returns:
True if command sent successfully, False otherwise
"""
if not self.serial or not self.serial.is_open:
logger.error("Serial port not open")
return False
try:
packet = CommandPacket(cmd, payload)
data = packet.to_bytes()
self.serial.write(data)
logger.debug(f"Sent command: {cmd.name}")
return True
except Exception as e:
logger.error(f"Failed to send command {cmd.name}: {e}")
if self.on_error:
self.on_error(e)
return False
def start_profiling(self) -> bool:
"""Send START_PROFILING command."""
return self.send_command(Command.START_PROFILING)
def stop_profiling(self) -> bool:
"""Send STOP_PROFILING command."""
return self.send_command(Command.STOP_PROFILING)
def get_metadata(self) -> bool:
"""Request metadata from the device."""
return self.send_command(Command.GET_METADATA)
def get_status(self) -> bool:
"""Request status from the device."""
return self.send_command(Command.GET_STATUS)
def reset_buffers(self) -> bool:
"""Send RESET_BUFFERS command."""
return self.send_command(Command.RESET_BUFFERS)
def _read_packet(self) -> Optional[ResponsePacket]:
"""Read a response packet from the serial port.
Returns:
ResponsePacket if valid packet received, None otherwise
"""
if not self.serial or not self.serial.is_open:
return None
buffer = bytearray()
try:
# Search for header (0xAA55)
while self.running:
byte = self.serial.read(1)
if not byte:
continue
buffer.append(byte[0])
# Check for header
if len(buffer) >= 2:
header = (buffer[-2] << 8) | buffer[-1]
if header == 0xAA55:
# Found header, read rest of packet
buffer = bytearray([0xAA, 0x55])
break
# Keep only last byte for next iteration
if len(buffer) > 1:
buffer = bytearray([buffer[-1]])
if not self.running:
return None
# Read type and length (3 bytes)
header_rest = self.serial.read(3)
if len(header_rest) < 3:
return None
buffer.extend(header_rest)
# Extract payload length
payload_len = (header_rest[2] << 8) | header_rest[1]
# Read payload + CRC (2 bytes) + end marker (1 byte)
remaining = payload_len + 3
remaining_data = self.serial.read(remaining)
if len(remaining_data) < remaining:
logger.warning(f"Incomplete packet: expected {remaining}, got {len(remaining_data)}")
return None
buffer.extend(remaining_data)
# Parse packet
packet = ResponsePacket.from_bytes(bytes(buffer))
return packet
except Exception as e:
logger.error(f"Error reading packet: {e}")
if self.on_error:
self.on_error(e)
return None
def _reader_thread(self):
"""Background thread for reading serial data."""
logger.info("Serial reader thread started")
while self.running:
packet = self._read_packet()
if not packet:
continue
try:
# Parse and dispatch based on response type
if packet.response_type == ResponseType.PROFILE_DATA:
records = packet.parse_payload()
if self.on_profile_data and isinstance(records, list):
self.on_profile_data(records)
elif packet.response_type == ResponseType.METADATA:
metadata = packet.parse_payload()
if self.on_metadata and isinstance(metadata, Metadata):
self.on_metadata(metadata)
elif packet.response_type == ResponseType.STATUS:
status = packet.parse_payload()
if self.on_status and isinstance(status, StatusInfo):
self.on_status(status)
elif packet.response_type == ResponseType.ACK:
logger.debug("Received ACK")
elif packet.response_type == ResponseType.NACK:
logger.warning("Received NACK")
except Exception as e:
logger.error(f"Error processing packet: {e}")
if self.on_error:
self.on_error(e)
logger.info("Serial reader thread stopped")
def start_reading(self) -> bool:
"""Start background thread to read serial data.
Returns:
True if thread started successfully, False otherwise
"""
if self.running:
logger.warning("Reader thread already running")
return False
if not self.serial or not self.serial.is_open:
logger.error("Serial port not open")
return False
self.running = True
self.read_thread = threading.Thread(target=self._reader_thread, daemon=True)
self.read_thread.start()
logger.info("Started serial reading thread")
return True
def stop_reading(self):
"""Stop the background reading thread."""
if not self.running:
return
logger.info("Stopping serial reader thread...")
self.running = False
if self.read_thread:
self.read_thread.join(timeout=2.0)
if self.read_thread.is_alive():
logger.warning("Reader thread did not stop cleanly")
else:
logger.info("Reader thread stopped")
def __enter__(self):
"""Context manager entry."""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.stop_reading()
self.disconnect()