com um clique
benchmark
// Write benchmark scripts for EmbodiChain modules following project conventions
// Write benchmark scripts for EmbodiChain modules following project conventions
Use when adding a new observation, event, reward, action, dataset, or randomization functor to an EmbodiChain environment
Use when adding a new observation, event, reward, action, dataset, or randomization functor to an EmbodiChain environment
Use when creating a new task environment for EmbodiChain, including expert demonstration tasks, RL tasks or any EmbodiedEnv subclass
Use when writing tests for EmbodiChain modules, including observation functors, reward functors, solvers, sensors, environments, or any Python module
Create a pull request for EmbodiChain following the project's PR template and conventions, including selecting proper GitHub repository labels
Use before committing or creating a PR for EmbodiChain to verify code style, headers, annotations, exports, and docstrings pass CI checks
| name | benchmark |
| description | Write benchmark scripts for EmbodiChain modules following project conventions |
This skill guides you through writing well-structured benchmark scripts for EmbodiChain modules, covering performance measurement of solvers, samplers, metrics, and other computationally intensive components.
Invoke this skill when:
scripts/benchmark/ or the word "benchmark" appears in the requestPlace benchmark scripts under:
scripts/benchmark/<domain>/<benchmark_name>.py
Examples:
scripts/benchmark/robotics/kinematic_solver/opw_solver.pyscripts/benchmark/workspace_analyzer/benchmark_workspace_analyzer.pyEvery benchmark file must begin with the Apache 2.0 copyright header followed by a module-level docstring:
# ----------------------------------------------------------------------------
# Copyright (c) 2021-2026 DexForce Technology Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------
"""One-line summary of what this benchmark measures.
Longer description of the optimizations or comparisons being evaluated.
Run: python -m scripts.benchmark.<domain>.<benchmark_name>
"""
Ask yourself:
[100, 1000, 10000, 100000]Use one helper function per concern, then a single orchestrator:
benchmark_<component_a>() # e.g., benchmark_halton_sampler()
benchmark_<component_b>() # e.g., benchmark_density_metric()
...
run_all_benchmarks() # calls all of the above + prints header/footer
Each benchmark function follows this pattern:
def benchmark_<name>():
"""One-line description of what is being measured."""
from embodichain.<module.path> import SomeClass, SomeCfg
# --- Setup (not timed) ---
cfg = SomeCfg(...)
obj = cfg.init_solver(...) # or SomeClass(cfg)
print("\n=== <Name> Benchmark ===")
for n in [100, 1000, 10000, 100000]:
# Prepare inputs (not timed)
inputs = ...
# --- Timed block ---
start = time.perf_counter()
result = obj.compute(inputs) # or obj.get_ik(...) etc.
elapsed = time.perf_counter() - start
print(f" n={n:>7d}: {elapsed*1000:>10.2f} ms (...)")
Key rules:
time.perf_counter() for high-resolution wall-clock timing, not time.time().elapsed * 1000) with consistent column alignment using > format specs.Exception: When benchmarking GPU (Warp/CUDA) code alongside a CPU baseline, it is acceptable to use
time.time()for coarser comparison timing, as seen inopw_solver.py. Prefertime.perf_counter()for CPU-only benchmarks.
When the benchmark compares two backends (e.g., Warp CUDA vs. Python OPW):
def check_<name>(solver_a, solver_b, n_samples=1000):
"""Run both solvers and return timing + accuracy metrics."""
# shared input generation
qpos = ...
# --- Solver A (e.g., Warp CUDA) ---
start = time.time()
success_a, result_a = solver_a.get_ik(xpos, ...)
time_a = time.time() - start
t_err_a, r_err_a = get_poses_err(...)
# --- Solver B (e.g., CPU) ---
start = time.time()
success_b, result_b = solver_b.get_ik(xpos, ...)
time_b = time.time() - start
t_err_b, r_err_b = get_poses_err(...)
return time_a, t_err_a, r_err_a, time_b, t_err_b, r_err_b
def benchmark_<name>():
cfg = ...
solver_a = cfg.init_solver(device=torch.device("cuda"), ...)
solver_b = cfg.init_solver(device=torch.device("cpu"), ...)
for n in [100, 1000, 10000, 100000]:
time_a, t_err_a, r_err_a, time_b, t_err_b, r_err_b = check_<name>(
solver_a, solver_b, n_samples=n
)
print(f"**** Test over {n} samples:")
print(f"===Impl A time: {time_a * 1000:.6f} ms")
print(f" Translation mean error: {t_err_a * 1000:.6f} mm")
print(f" Rotation mean error: {r_err_a * 180 / np.pi:.6f} degrees")
print(f"===Impl B time: {time_b * 1000:.6f} ms")
...
For FK/IK solvers, always verify correctness by running FK on the IK output and measuring pose error:
def get_pose_err(matrix_a: np.ndarray, matrix_b: np.ndarray) -> tuple[float, float]:
"""Return (translation_error_m, rotation_error_rad)."""
t_err = np.linalg.norm(matrix_a[:3, 3] - matrix_b[:3, 3])
relative_rot = matrix_a[:3, :3].T @ matrix_b[:3, :3]
cos_angle = np.clip((np.trace(relative_rot) - 1) / 2.0, -1.0, 1.0)
r_err = np.arccos(cos_angle)
return t_err, r_err
def get_poses_err(
matrix_a_list: list[np.ndarray], matrix_b_list: list[np.ndarray]
) -> tuple[float, float]:
t_errs, r_errs = [], []
for a, b in zip(matrix_a_list, matrix_b_list):
t, r = get_pose_err(a, b)
t_errs.append(t)
r_errs.append(r)
return float(np.mean(t_errs)), float(np.mean(r_errs))
If a benchmark requires a live simulation, robot, or GPU device that may not be available, skip gracefully rather than raising an error:
def benchmark_batch_fk():
"""Benchmark batch FK (requires GPU robot setup)."""
print("\n=== Batch FK Benchmark (requires robot/simulation) ===")
print(" Skipped -- requires live SimulationManager and Robot.")
print(" To run manually, integrate with your robot setup:")
print(" analyzer.compute_workspace_points(joint_configs, batch_size=512)")
def run_all_benchmarks():
"""Run all benchmarks and print summary."""
print("=" * 60)
print("<Module Name> Performance Benchmarks")
print("=" * 60)
benchmark_component_a()
benchmark_component_b()
# ...
print("\n" + "=" * 60)
print("Benchmarks complete.")
print("=" * 60)
if __name__ == "__main__":
run_all_benchmarks()
Every benchmark script must write its final results to one Markdown file after execution.
outputs/benchmarks/<benchmark_name>_<YYYYMMDD_HHMMSS>.mdTime & Memory table (cost time + memory columns)Success & Other Metrics table (success rate + quality/accuracy/extra metrics)Leaderboard table (algorithm ranking by overall success rate, descending)Leaderboard coverage rule: include all algorithms evaluated in the current benchmark scope. If a provided leaderboard artifact is incomplete, backfill missing algorithms from aggregate summaries before rendering.Use this pattern:
from datetime import datetime
from pathlib import Path
def write_markdown_report(
benchmark_name: str,
perf_rows: list[dict[str, object]],
metric_rows: list[dict[str, object]],
leaderboard_rows: list[dict[str, object]],
notes: list[str] | None = None,
) -> Path:
"""Write benchmark results into a single markdown report file."""
output_dir = Path("outputs/benchmarks")
output_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = output_dir / f"{benchmark_name}_{ts}.md"
lines: list[str] = [
f"# {benchmark_name} Benchmark Report",
"",
f"Generated at: {datetime.now().isoformat(timespec='seconds')}",
"",
"## Time & Memory",
"",
]
if perf_rows:
perf_headers = list(perf_rows[0].keys())
lines.append("| " + " | ".join(perf_headers) + " |")
lines.append("| " + " | ".join(["---"] * len(perf_headers)) + " |")
for row in perf_rows:
lines.append("| " + " | ".join(str(row[h]) for h in perf_headers) + " |")
else:
lines.append("No time/memory rows were produced.")
lines.extend(["", "## Success & Other Metrics", ""])
if metric_rows:
metric_headers = list(metric_rows[0].keys())
lines.append("| " + " | ".join(metric_headers) + " |")
lines.append("| " + " | ".join(["---"] * len(metric_headers)) + " |")
for row in metric_rows:
lines.append(
"| " + " | ".join(str(row[h]) for h in metric_headers) + " |"
)
else:
lines.append("No success/metric rows were produced.")
lines.extend(["", "## Leaderboard", ""])
if leaderboard_rows:
leaderboard_headers = list(leaderboard_rows[0].keys())
lines.append("| " + " | ".join(leaderboard_headers) + " |")
lines.append("| " + " | ".join(["---"] * len(leaderboard_headers)) + " |")
for row in leaderboard_rows:
lines.append(
"| " + " | ".join(str(row[h]) for h in leaderboard_headers) + " |"
)
else:
lines.append("No leaderboard rows were produced.")
if notes:
lines.extend(["", "## Notes", ""])
lines.extend([f"- {note}" for note in notes])
report_path.write_text("\\n".join(lines) + "\\n", encoding="utf-8")
return report_path
And call it at the end of run_all_benchmarks():
def run_all_benchmarks() -> None:
perf_rows: list[dict[str, object]] = []
metric_rows: list[dict[str, object]] = []
perf_part, metric_part = benchmark_halton_sampler()
perf_rows.extend(perf_part)
metric_rows.extend(metric_part)
perf_part, metric_part = benchmark_density_metric()
perf_rows.extend(perf_part)
metric_rows.extend(metric_part)
# ...
leaderboard_rows = build_leaderboard_rows(metric_rows)
# `build_leaderboard_rows` should aggregate per algorithm and sort by
# overall success rate in descending order.
report_path = write_markdown_report(
benchmark_name="workspace_analyzer",
perf_rows=perf_rows,
metric_rows=metric_rows,
leaderboard_rows=leaderboard_rows,
notes=["CPU/GPU memory fields are deltas measured around timed calls."],
)
print(f"Markdown report saved: {report_path}")
| Scenario | Print format |
|---|---|
| Single implementation, many sizes | n={n:>7d}: {elapsed*1000:>10.2f} ms | CPU Δ={...:+.1f} MB GPU Δ={...:+.1f} MB peak GPU={...:.1f} MB |
| Two implementations compared | ===<Impl> time: {ms:.6f} ms then error & memory lines indented 3 spaces |
| Markdown report path | Markdown report saved: outputs/benchmarks/<name>_<timestamp>.md |
| Markdown table 1 (Time & Memory) | ` |
| Markdown table 2 (Success & Metrics) | ` |
| Markdown table 3 (Leaderboard) | ` |
| Section header | \n=== <Name> Benchmark === |
| Top-level separator | "=" * 60 |
Always measure both GPU VRAM and CPU RAM alongside wall-clock time. Use the helpers below.
import torch
def get_gpu_memory_mb() -> float:
"""Return current GPU VRAM allocated by PyTorch in MB."""
if torch.cuda.is_available():
return torch.cuda.memory_allocated() / 1024 ** 2
return 0.0
# Usage pattern inside a benchmark loop:
torch.cuda.reset_peak_memory_stats() # reset peak counter before timed block
mem_before = get_gpu_memory_mb()
start = time.perf_counter()
result = obj.compute(inputs)
elapsed = time.perf_counter() - start
mem_after = get_gpu_memory_mb()
peak_vram = torch.cuda.max_memory_allocated() / 1024 ** 2 # peak during timed block
print(
f" n={n:>7d}: {elapsed*1000:>10.2f} ms | "
f"VRAM delta={mem_after - mem_before:+.1f} MB peak={peak_vram:.1f} MB"
)
psutil)import psutil, os
def get_cpu_memory_mb() -> float:
"""Return current process RSS (resident set size) in MB."""
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 ** 2
# Usage pattern:
mem_before = get_cpu_memory_mb()
start = time.perf_counter()
result = obj.compute(inputs)
elapsed = time.perf_counter() - start
mem_after = get_cpu_memory_mb()
print(
f" n={n:>7d}: {elapsed*1000:>10.2f} ms | "
f"RAM delta={mem_after - mem_before:+.1f} MB"
)
For benchmarks that use both CPU and GPU, combine into a single snapshot:
import os, psutil, torch
def memory_snapshot() -> dict:
"""Return a dict with current CPU RSS and GPU allocated memory in MB."""
process = psutil.Process(os.getpid())
cpu_mb = process.memory_info().rss / 1024 ** 2
gpu_mb = torch.cuda.memory_allocated() / 1024 ** 2 if torch.cuda.is_available() else 0.0
return {"cpu_mb": cpu_mb, "gpu_mb": gpu_mb}
# Usage:
torch.cuda.reset_peak_memory_stats()
before = memory_snapshot()
start = time.perf_counter()
result = obj.compute(inputs)
elapsed = time.perf_counter() - start
after = memory_snapshot()
peak_gpu = torch.cuda.max_memory_allocated() / 1024 ** 2
print(
f" n={n:>7d}: {elapsed*1000:>10.2f} ms | "
f"CPU Δ={after['cpu_mb'] - before['cpu_mb']:+.1f} MB "
f"GPU Δ={after['gpu_mb'] - before['gpu_mb']:+.1f} MB peak GPU={peak_gpu:.1f} MB"
)
Add
psutilto the project's dev-dependencies if not already present (pip install psutil).
import os
import time
import psutil
import numpy as np
import torch
import warp as wp # only when GPU kernels are benchmarked
from scipy.spatial.transform import Rotation # only when needed
from typing import Tuple, List # or use built-in generics (Python ≥ 3.10)
Before finishing a benchmark script:
Run: linetime.perf_counter() (or time.time() when comparing GPU/CPU coarsely)psutil (delta MB before/after timed block)torch.cuda.memory_allocated() + torch.cuda.max_memory_allocated() (delta + peak)torch.cuda.reset_peak_memory_stats() called before each timed blockrun_all_benchmarks() orchestrator with formatted separatorsTime & Memory, Success & Other Metrics, and LeaderboardTime & Memory table includes cost_time_ms, cpu_delta_mb, gpu_delta_mb, peak_gpu_mbSuccess & Other Metrics table includes success_rate and domain-specific quality metricsLeaderboard table ranks algorithms by overall success rate in descending orderLeaderboard table includes all benchmarked algorithms (missing entries are backfilled from aggregate summaries if needed)if __name__ == "__main__": entry pointblack . formatting applied