Files
MiniProfiler/host/miniprofiler/protocol.py
2025-11-28 08:26:21 +05:30

289 lines
9.0 KiB
Python

"""Protocol definitions and packet structures for MiniProfiler.
This module defines the binary protocol used for communication between
the embedded device and the host application.
"""
import struct
from enum import IntEnum
from dataclasses import dataclass
from typing import List, Optional
import crc
class Command(IntEnum):
"""Commands sent from host to embedded device."""
START_PROFILING = 0x01
STOP_PROFILING = 0x02
GET_STATUS = 0x03
RESET_BUFFERS = 0x04
GET_METADATA = 0x05
SET_CONFIG = 0x06
class ResponseType(IntEnum):
"""Response types from embedded device to host."""
ACK = 0x01
NACK = 0x02
METADATA = 0x03
STATUS = 0x04
PROFILE_DATA = 0x05
# Protocol constants
COMMAND_HEADER = 0x55
RESPONSE_HEADER = 0xAA55
RESPONSE_END = 0x0A
PROTOCOL_VERSION = 0x01
@dataclass
class ProfileRecord:
"""A single profiling record from the embedded device.
Attributes:
func_addr: Function address (from instrumentation)
entry_time: Entry timestamp in microseconds
duration_us: Function duration in microseconds
depth: Call stack depth
"""
func_addr: int
entry_time: int
duration_us: int
depth: int
@classmethod
def from_bytes(cls, data: bytes) -> 'ProfileRecord':
"""Parse a ProfileRecord from binary data.
Format: <I (func_addr) <I (entry_time) <I (duration_us) <H (depth)
Total: 14 bytes
"""
if len(data) < 14:
raise ValueError(f"Invalid ProfileRecord size: {len(data)} bytes")
func_addr, entry_time, duration_us, depth = struct.unpack('<IIIH', data[:14])
return cls(func_addr, entry_time, duration_us, depth)
def to_bytes(self) -> bytes:
"""Serialize ProfileRecord to binary format."""
return struct.pack('<IIIH', self.func_addr, self.entry_time,
self.duration_us, self.depth)
@dataclass
class Metadata:
"""Metadata packet sent by embedded device at startup.
Attributes:
mcu_clock_hz: MCU clock frequency in Hz
timer_freq: Timer frequency in Hz
elf_build_id: CRC32 of .text section for version matching
fw_version: Firmware version string
"""
mcu_clock_hz: int
timer_freq: int
elf_build_id: int
fw_version: str
@classmethod
def from_bytes(cls, data: bytes) -> 'Metadata':
"""Parse Metadata from binary data.
Format: <I (mcu_clock) <I (timer_freq) <I (build_id) 16s (fw_version)
Total: 28 bytes
"""
if len(data) < 28:
raise ValueError(f"Invalid Metadata size: {len(data)} bytes")
mcu_clock_hz, timer_freq, elf_build_id, fw_version_bytes = struct.unpack(
'<III16s', data[:28]
)
fw_version = fw_version_bytes.decode('utf-8').rstrip('\x00')
return cls(mcu_clock_hz, timer_freq, elf_build_id, fw_version)
def to_bytes(self) -> bytes:
"""Serialize Metadata to binary format."""
fw_version_bytes = self.fw_version.encode('utf-8')[:16].ljust(16, b'\x00')
return struct.pack('<III16s', self.mcu_clock_hz, self.timer_freq,
self.elf_build_id, fw_version_bytes)
@dataclass
class StatusInfo:
"""Status information from embedded device.
Attributes:
is_profiling: Whether profiling is currently active
buffer_overflows: Number of buffer overflow events
records_captured: Total number of records captured
buffer_usage_percent: Current buffer usage percentage
"""
is_profiling: bool
buffer_overflows: int
records_captured: int
buffer_usage_percent: int
@classmethod
def from_bytes(cls, data: bytes) -> 'StatusInfo':
"""Parse StatusInfo from binary data.
Format: <B (is_profiling) <I (overflows) <I (records) <B (buffer_pct)
Total: 10 bytes
"""
if len(data) < 10:
raise ValueError(f"Invalid StatusInfo size: {len(data)} bytes")
is_profiling, buffer_overflows, records_captured, buffer_usage_percent = \
struct.unpack('<BIIB', data[:10])
return cls(bool(is_profiling), buffer_overflows, records_captured,
buffer_usage_percent)
class CommandPacket:
"""Command packet sent from host to embedded device."""
def __init__(self, cmd: Command, payload: bytes = b''):
"""Create a command packet.
Args:
cmd: Command type
payload: Optional payload (max 8 bytes)
"""
if len(payload) > 8:
raise ValueError("Command payload cannot exceed 8 bytes")
self.cmd = cmd
self.payload = payload.ljust(8, b'\x00')
def to_bytes(self) -> bytes:
"""Serialize command packet to binary format.
Format: <B (header) <B (cmd) <B (payload_len) 8s (payload) <B (checksum)
Total: 12 bytes
"""
payload_len = len(self.payload.rstrip(b'\x00'))
data = struct.pack('<BBB8s', COMMAND_HEADER, self.cmd, payload_len, self.payload)
checksum = sum(data) & 0xFF
return data + struct.pack('<B', checksum)
class ResponsePacket:
"""Response packet from embedded device to host."""
def __init__(self, response_type: ResponseType, payload: bytes):
"""Create a response packet.
Args:
response_type: Type of response
payload: Response payload
"""
self.response_type = response_type
self.payload = payload
@staticmethod
def calculate_crc16(data: bytes) -> int:
"""Calculate CRC16-CCITT for data validation."""
# CRC-16-CCITT: polynomial 0x1021, init 0xFFFF
calculator = crc.Calculator(crc.Configuration(
width=16,
polynomial=0x1021,
init_value=0xFFFF,
final_xor_value=0x0000,
reverse_input=False,
reverse_output=False
))
return calculator.checksum(data)
def to_bytes(self) -> bytes:
"""Serialize response packet to binary format.
Format: <H (header) <B (type) <H (payload_len) payload <H (crc16) <B (end)
"""
payload_len = len(self.payload)
header_data = struct.pack('<HBH', RESPONSE_HEADER, self.response_type,
payload_len)
data = header_data + self.payload
crc16 = self.calculate_crc16(data)
return data + struct.pack('<HB', crc16, RESPONSE_END)
@classmethod
def from_bytes(cls, data: bytes) -> Optional['ResponsePacket']:
"""Parse response packet from binary data.
Returns:
ResponsePacket if valid, None otherwise
"""
if len(data) < 8: # Minimum packet size
return None
# Parse header
header, response_type, payload_len = struct.unpack('<HBH', data[:5])
if header != RESPONSE_HEADER:
raise ValueError(f"Invalid response header: 0x{header:04X}")
# Check if we have enough data
total_len = 5 + payload_len + 3 # header + payload + crc + end
if len(data) < total_len:
return None
# Extract payload
payload = data[5:5+payload_len]
# Verify CRC
crc16_expected = struct.unpack('<H', data[5+payload_len:5+payload_len+2])[0]
crc16_actual = cls.calculate_crc16(data[:5+payload_len])
if crc16_expected != crc16_actual:
raise ValueError(f"CRC mismatch: expected 0x{crc16_expected:04X}, "
f"got 0x{crc16_actual:04X}")
# Verify end marker
end_marker = data[5+payload_len+2]
if end_marker != RESPONSE_END:
raise ValueError(f"Invalid end marker: 0x{end_marker:02X}")
return cls(ResponseType(response_type), payload)
def parse_payload(self) -> object:
"""Parse the payload based on response type.
Returns:
Parsed payload object (Metadata, StatusInfo, List[ProfileRecord], etc.)
"""
if self.response_type == ResponseType.METADATA:
return Metadata.from_bytes(self.payload)
elif self.response_type == ResponseType.STATUS:
return StatusInfo.from_bytes(self.payload)
elif self.response_type == ResponseType.PROFILE_DATA:
# First byte is protocol version
if len(self.payload) < 3:
return []
version = self.payload[0]
if version != PROTOCOL_VERSION:
raise ValueError(f"Unsupported protocol version: {version}")
record_count = struct.unpack('<H', self.payload[1:3])[0]
records = []
offset = 3
for _ in range(record_count):
if offset + 14 > len(self.payload):
break
record = ProfileRecord.from_bytes(self.payload[offset:offset+14])
records.append(record)
offset += 14
return records
elif self.response_type == ResponseType.ACK:
return True
elif self.response_type == ResponseType.NACK:
return False
return self.payload