- Contains the host code with a protocol implementation, data analyser and web-based visualiser
268 lines
8.7 KiB
Python
268 lines
8.7 KiB
Python
"""Serial communication module for MiniProfiler.
|
|
|
|
Handles UART communication with the embedded device, including
|
|
sending commands and receiving profiling data.
|
|
"""
|
|
|
|
import serial
|
|
import threading
|
|
import time
|
|
import logging
|
|
from typing import Callable, Optional, List
|
|
from queue import Queue
|
|
|
|
from .protocol import (
|
|
Command, CommandPacket, ResponsePacket, ResponseType,
|
|
ProfileRecord, Metadata, StatusInfo
|
|
)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SerialReader:
|
|
"""Manages serial communication with the embedded profiling device."""
|
|
|
|
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 1.0):
|
|
"""Initialize serial reader.
|
|
|
|
Args:
|
|
port: Serial port name (e.g., '/dev/ttyUSB0', 'COM3')
|
|
baudrate: Baud rate (default: 115200)
|
|
timeout: Read timeout in seconds
|
|
"""
|
|
self.port = port
|
|
self.baudrate = baudrate
|
|
self.timeout = timeout
|
|
self.serial: Optional[serial.Serial] = None
|
|
self.running = False
|
|
self.read_thread: Optional[threading.Thread] = None
|
|
self.command_queue = Queue()
|
|
|
|
# Callbacks
|
|
self.on_profile_data: Optional[Callable[[List[ProfileRecord]], None]] = None
|
|
self.on_metadata: Optional[Callable[[Metadata], None]] = None
|
|
self.on_status: Optional[Callable[[StatusInfo], None]] = None
|
|
self.on_error: Optional[Callable[[Exception], None]] = None
|
|
|
|
def connect(self) -> bool:
|
|
"""Open the serial port connection.
|
|
|
|
Returns:
|
|
True if connection successful, False otherwise
|
|
"""
|
|
try:
|
|
self.serial = serial.Serial(
|
|
port=self.port,
|
|
baudrate=self.baudrate,
|
|
timeout=self.timeout,
|
|
bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE
|
|
)
|
|
logger.info(f"Connected to {self.port} at {self.baudrate} baud")
|
|
return True
|
|
except serial.SerialException as e:
|
|
logger.error(f"Failed to connect to {self.port}: {e}")
|
|
if self.on_error:
|
|
self.on_error(e)
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Close the serial port connection."""
|
|
if self.serial and self.serial.is_open:
|
|
self.serial.close()
|
|
logger.info(f"Disconnected from {self.port}")
|
|
|
|
def send_command(self, cmd: Command, payload: bytes = b'') -> bool:
|
|
"""Send a command to the embedded device.
|
|
|
|
Args:
|
|
cmd: Command to send
|
|
payload: Optional command payload
|
|
|
|
Returns:
|
|
True if command sent successfully, False otherwise
|
|
"""
|
|
if not self.serial or not self.serial.is_open:
|
|
logger.error("Serial port not open")
|
|
return False
|
|
|
|
try:
|
|
packet = CommandPacket(cmd, payload)
|
|
data = packet.to_bytes()
|
|
self.serial.write(data)
|
|
logger.debug(f"Sent command: {cmd.name}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to send command {cmd.name}: {e}")
|
|
if self.on_error:
|
|
self.on_error(e)
|
|
return False
|
|
|
|
def start_profiling(self) -> bool:
|
|
"""Send START_PROFILING command."""
|
|
return self.send_command(Command.START_PROFILING)
|
|
|
|
def stop_profiling(self) -> bool:
|
|
"""Send STOP_PROFILING command."""
|
|
return self.send_command(Command.STOP_PROFILING)
|
|
|
|
def get_metadata(self) -> bool:
|
|
"""Request metadata from the device."""
|
|
return self.send_command(Command.GET_METADATA)
|
|
|
|
def get_status(self) -> bool:
|
|
"""Request status from the device."""
|
|
return self.send_command(Command.GET_STATUS)
|
|
|
|
def reset_buffers(self) -> bool:
|
|
"""Send RESET_BUFFERS command."""
|
|
return self.send_command(Command.RESET_BUFFERS)
|
|
|
|
def _read_packet(self) -> Optional[ResponsePacket]:
|
|
"""Read a response packet from the serial port.
|
|
|
|
Returns:
|
|
ResponsePacket if valid packet received, None otherwise
|
|
"""
|
|
if not self.serial or not self.serial.is_open:
|
|
return None
|
|
|
|
buffer = bytearray()
|
|
|
|
try:
|
|
# Search for header (0xAA55)
|
|
while self.running:
|
|
byte = self.serial.read(1)
|
|
if not byte:
|
|
continue
|
|
|
|
buffer.append(byte[0])
|
|
|
|
# Check for header
|
|
if len(buffer) >= 2:
|
|
header = (buffer[-2] << 8) | buffer[-1]
|
|
if header == 0xAA55:
|
|
# Found header, read rest of packet
|
|
buffer = bytearray([0xAA, 0x55])
|
|
break
|
|
# Keep only last byte for next iteration
|
|
if len(buffer) > 1:
|
|
buffer = bytearray([buffer[-1]])
|
|
|
|
if not self.running:
|
|
return None
|
|
|
|
# Read type and length (3 bytes)
|
|
header_rest = self.serial.read(3)
|
|
if len(header_rest) < 3:
|
|
return None
|
|
buffer.extend(header_rest)
|
|
|
|
# Extract payload length
|
|
payload_len = (header_rest[2] << 8) | header_rest[1]
|
|
|
|
# Read payload + CRC (2 bytes) + end marker (1 byte)
|
|
remaining = payload_len + 3
|
|
remaining_data = self.serial.read(remaining)
|
|
if len(remaining_data) < remaining:
|
|
logger.warning(f"Incomplete packet: expected {remaining}, got {len(remaining_data)}")
|
|
return None
|
|
|
|
buffer.extend(remaining_data)
|
|
|
|
# Parse packet
|
|
packet = ResponsePacket.from_bytes(bytes(buffer))
|
|
return packet
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading packet: {e}")
|
|
if self.on_error:
|
|
self.on_error(e)
|
|
return None
|
|
|
|
def _reader_thread(self):
|
|
"""Background thread for reading serial data."""
|
|
logger.info("Serial reader thread started")
|
|
|
|
while self.running:
|
|
packet = self._read_packet()
|
|
if not packet:
|
|
continue
|
|
|
|
try:
|
|
# Parse and dispatch based on response type
|
|
if packet.response_type == ResponseType.PROFILE_DATA:
|
|
records = packet.parse_payload()
|
|
if self.on_profile_data and isinstance(records, list):
|
|
self.on_profile_data(records)
|
|
|
|
elif packet.response_type == ResponseType.METADATA:
|
|
metadata = packet.parse_payload()
|
|
if self.on_metadata and isinstance(metadata, Metadata):
|
|
self.on_metadata(metadata)
|
|
|
|
elif packet.response_type == ResponseType.STATUS:
|
|
status = packet.parse_payload()
|
|
if self.on_status and isinstance(status, StatusInfo):
|
|
self.on_status(status)
|
|
|
|
elif packet.response_type == ResponseType.ACK:
|
|
logger.debug("Received ACK")
|
|
|
|
elif packet.response_type == ResponseType.NACK:
|
|
logger.warning("Received NACK")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing packet: {e}")
|
|
if self.on_error:
|
|
self.on_error(e)
|
|
|
|
logger.info("Serial reader thread stopped")
|
|
|
|
def start_reading(self) -> bool:
|
|
"""Start background thread to read serial data.
|
|
|
|
Returns:
|
|
True if thread started successfully, False otherwise
|
|
"""
|
|
if self.running:
|
|
logger.warning("Reader thread already running")
|
|
return False
|
|
|
|
if not self.serial or not self.serial.is_open:
|
|
logger.error("Serial port not open")
|
|
return False
|
|
|
|
self.running = True
|
|
self.read_thread = threading.Thread(target=self._reader_thread, daemon=True)
|
|
self.read_thread.start()
|
|
logger.info("Started serial reading thread")
|
|
return True
|
|
|
|
def stop_reading(self):
|
|
"""Stop the background reading thread."""
|
|
if not self.running:
|
|
return
|
|
|
|
logger.info("Stopping serial reader thread...")
|
|
self.running = False
|
|
|
|
if self.read_thread:
|
|
self.read_thread.join(timeout=2.0)
|
|
if self.read_thread.is_alive():
|
|
logger.warning("Reader thread did not stop cleanly")
|
|
else:
|
|
logger.info("Reader thread stopped")
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry."""
|
|
self.connect()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit."""
|
|
self.stop_reading()
|
|
self.disconnect()
|