- Contains the host code with a protocol implementation, data analyser and web-based visualiser
237 lines
7.2 KiB
Python
237 lines
7.2 KiB
Python
"""Sample data generator for testing the profiler without hardware.
|
|
|
|
Generates realistic profiling data that mimics an embedded application
|
|
with nested function calls, varying execution times, and typical patterns.
|
|
"""
|
|
|
|
import random
|
|
from typing import List
|
|
from miniprofiler.protocol import ProfileRecord, Metadata, ResponsePacket, ResponseType
|
|
import struct
|
|
|
|
|
|
# Sample function addresses (simulating typical embedded firmware)
|
|
FUNCTIONS = {
|
|
0x08000100: "main",
|
|
0x08000200: "app_init",
|
|
0x08000220: "peripheral_init",
|
|
0x08000240: "clock_config",
|
|
0x08000300: "app_loop",
|
|
0x08000320: "process_sensors",
|
|
0x08000340: "read_temperature",
|
|
0x08000360: "read_pressure",
|
|
0x08000380: "process_data",
|
|
0x080003A0: "calculate_average",
|
|
0x080003C0: "apply_filter",
|
|
0x08000400: "update_display",
|
|
0x08000420: "format_string",
|
|
0x08000440: "send_uart",
|
|
0x08000500: "handle_interrupt",
|
|
0x08000520: "gpio_callback",
|
|
}
|
|
|
|
|
|
def generate_metadata() -> Metadata:
|
|
"""Generate sample metadata packet."""
|
|
return Metadata(
|
|
mcu_clock_hz=168_000_000, # 168 MHz (typical STM32F4)
|
|
timer_freq=1_000_000, # 1 MHz timer
|
|
elf_build_id=0xDEADBEEF,
|
|
fw_version="v1.0.0-test"
|
|
)
|
|
|
|
|
|
def generate_nested_calls(
|
|
start_time: int,
|
|
depth: int = 0,
|
|
max_depth: int = 5
|
|
) -> tuple[List[ProfileRecord], int]:
|
|
"""Generate nested function calls recursively.
|
|
|
|
Args:
|
|
start_time: Starting timestamp in microseconds
|
|
depth: Current call stack depth
|
|
max_depth: Maximum recursion depth
|
|
|
|
Returns:
|
|
Tuple of (list of ProfileRecords, end_time)
|
|
"""
|
|
records = []
|
|
current_time = start_time
|
|
|
|
# Select random functions based on depth
|
|
if depth == 0:
|
|
func_addr = 0x08000300 # app_loop
|
|
num_children = random.randint(2, 4)
|
|
elif depth == 1:
|
|
func_addr = random.choice([0x08000320, 0x08000380, 0x08000400])
|
|
num_children = random.randint(1, 3)
|
|
else:
|
|
func_addr = random.choice(list(FUNCTIONS.keys())[depth * 2:(depth + 1) * 2 + 4])
|
|
num_children = random.randint(0, 2) if depth < max_depth else 0
|
|
|
|
entry_time = current_time
|
|
current_time += random.randint(1, 10) # Entry overhead
|
|
|
|
# Generate child calls
|
|
children_records = []
|
|
if num_children > 0 and depth < max_depth:
|
|
for _ in range(num_children):
|
|
child_records, current_time = generate_nested_calls(
|
|
current_time, depth + 1, max_depth
|
|
)
|
|
children_records.extend(child_records)
|
|
current_time += random.randint(5, 20) # Gap between children
|
|
|
|
# Add some self-time for this function
|
|
self_time = random.randint(10, 200)
|
|
current_time += self_time
|
|
|
|
exit_time = current_time
|
|
duration = exit_time - entry_time
|
|
|
|
# Create record for this function
|
|
record = ProfileRecord(
|
|
func_addr=func_addr,
|
|
entry_time=entry_time,
|
|
duration_us=duration,
|
|
depth=depth
|
|
)
|
|
|
|
records.append(record)
|
|
records.extend(children_records)
|
|
|
|
return records, exit_time
|
|
|
|
|
|
def generate_sample_profile_data(
|
|
num_iterations: int = 10,
|
|
time_per_iteration: int = 10000 # 10ms per iteration
|
|
) -> List[ProfileRecord]:
|
|
"""Generate sample profiling data simulating multiple loop iterations.
|
|
|
|
Args:
|
|
num_iterations: Number of main loop iterations
|
|
time_per_iteration: Approximate time per iteration in microseconds
|
|
|
|
Returns:
|
|
List of ProfileRecord objects
|
|
"""
|
|
all_records = []
|
|
current_time = 0
|
|
|
|
# Generate initialization sequence (runs once)
|
|
init_records = [
|
|
ProfileRecord(0x08000100, current_time, 5000, 0), # main
|
|
ProfileRecord(0x08000200, current_time + 100, 1000, 1), # app_init
|
|
ProfileRecord(0x08000220, current_time + 150, 300, 2), # peripheral_init
|
|
ProfileRecord(0x08000240, current_time + 500, 400, 2), # clock_config
|
|
]
|
|
all_records.extend(init_records)
|
|
current_time += 5000
|
|
|
|
# Generate main loop iterations
|
|
for iteration in range(num_iterations):
|
|
records, end_time = generate_nested_calls(
|
|
current_time,
|
|
depth=0,
|
|
max_depth=4
|
|
)
|
|
all_records.extend(records)
|
|
current_time = end_time + random.randint(50, 200) # Idle time
|
|
|
|
return all_records
|
|
|
|
|
|
def generate_profile_data_packet(records: List[ProfileRecord]) -> bytes:
|
|
"""Generate a binary PROFILE_DATA response packet.
|
|
|
|
Args:
|
|
records: List of ProfileRecord objects
|
|
|
|
Returns:
|
|
Binary packet data
|
|
"""
|
|
# Build payload: version (1B) + count (2B) + records
|
|
payload = struct.pack('<BH', 0x01, len(records))
|
|
|
|
for record in records:
|
|
payload += record.to_bytes()
|
|
|
|
# Create response packet
|
|
packet = ResponsePacket(ResponseType.PROFILE_DATA, payload)
|
|
return packet.to_bytes()
|
|
|
|
|
|
def generate_metadata_packet() -> bytes:
|
|
"""Generate a binary METADATA response packet.
|
|
|
|
Returns:
|
|
Binary packet data
|
|
"""
|
|
metadata = generate_metadata()
|
|
packet = ResponsePacket(ResponseType.METADATA, metadata.to_bytes())
|
|
return packet.to_bytes()
|
|
|
|
|
|
def save_sample_data(filename: str, num_iterations: int = 50):
|
|
"""Generate and save sample profiling data to a file.
|
|
|
|
Args:
|
|
filename: Output filename
|
|
num_iterations: Number of loop iterations to generate
|
|
"""
|
|
records = generate_sample_profile_data(num_iterations)
|
|
|
|
with open(filename, 'wb') as f:
|
|
# Write metadata packet
|
|
f.write(generate_metadata_packet())
|
|
|
|
# Write profile data in chunks (simulate streaming)
|
|
chunk_size = 20
|
|
for i in range(0, len(records), chunk_size):
|
|
chunk = records[i:i + chunk_size]
|
|
f.write(generate_profile_data_packet(chunk))
|
|
|
|
print(f"Generated {len(records)} records in {filename}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Generate sample data
|
|
records = generate_sample_profile_data(num_iterations=20)
|
|
|
|
print(f"Generated {len(records)} profiling records")
|
|
print(f"\nFirst 10 records:")
|
|
for i, record in enumerate(records[:10]):
|
|
func_name = FUNCTIONS.get(record.func_addr, f"0x{record.func_addr:08x}")
|
|
print(f" [{i}] {func_name:20s} @ {record.entry_time:8d}us, "
|
|
f"duration: {record.duration_us:6d}us, depth: {record.depth}")
|
|
|
|
# Save to file
|
|
save_sample_data("sample_profile_data.bin", num_iterations=50)
|
|
|
|
# Also generate JSON for testing web interface
|
|
import json
|
|
from miniprofiler.analyzer import ProfileAnalyzer
|
|
|
|
analyzer = ProfileAnalyzer()
|
|
analyzer.add_records(records)
|
|
|
|
# Generate flamegraph data
|
|
flamegraph_data = analyzer.to_flamegraph_json()
|
|
with open("sample_flamegraph.json", 'w') as f:
|
|
json.dump(flamegraph_data, f, indent=2)
|
|
print(f"\nGenerated sample_flamegraph.json")
|
|
|
|
# Generate statistics data
|
|
stats_data = analyzer.to_statistics_json()
|
|
with open("sample_statistics.json", 'w') as f:
|
|
json.dump(stats_data, f, indent=2)
|
|
print(f"Generated sample_statistics.json")
|
|
|
|
# Generate timeline data
|
|
timeline_data = analyzer.to_timeline_json()
|
|
with open("sample_timeline.json", 'w') as f:
|
|
json.dump(timeline_data, f, indent=2)
|
|
print(f"Generated sample_timeline.json")
|