com um clique
ros2-bag-utility
// ROS2 bag recording and analysis utilities with Clean Architecture (Python & C++)
// ROS2 bag recording and analysis utilities with Clean Architecture (Python & C++)
Configure and build gz-sim from source using either CMake/Ninja directly or a colcon workspace. Trigger when the user asks to build, recompile, configure, or set up the gz-sim binary tree.
Concise reference for the gz-sim Entity-Component-System architecture — how Entities, Components, Systems, the ECM, the Server, and SimulationRunner fit together. Trigger when the user asks how gz-sim is organized, where to add code, or how the simulation loop runs.
Run unit, integration, performance, and benchmark tests for gz-sim, filter by name, and debug a single failing case. Trigger when the user asks to test, rerun a test, debug a flaky test, or check that a change didn't regress something.
Scaffold a new ECS component header under include/gz/sim/components/, wire its CMake listing, and (if needed) register it with ComponentFactory for serialization. Trigger when the user asks to add a new component or a per-entity data field.
Scaffold a new Nav 2 plugin (controller, planner, behavior, smoother, goal-checker, progress-checker, costmap layer, or BT node). Wires pluginlib registration, parameter declaration on the lifecycle node, and a minimal integration test. Trigger when the user asks to write or extend a Nav 2 plugin.
Scaffold a new ROS 2 package (ament_python or ament_cmake) inside a colcon workspace, restructured to follow this template's Clean Architecture layout. Trigger when the user asks to create a new ROS 2 package or to bootstrap a Clean-Architecture-compliant codebase.
| name | ROS2 Bag Utility |
| description | ROS2 bag recording and analysis utilities with Clean Architecture (Python & C++) |
This skill provides guides for programmatic data recording (rosbag2) and replay mechanism adhering to Clean Architecture.
Defines the interface for recording and playback control.
# domain/interfaces/data_recorder.py
class IDataRecorder(ABC):
@abstractmethod
def start_recording(self, config: RecordingConfig) -> bool:
pass
@abstractmethod
def stop_recording(self) -> None:
pass
See previous Python example using rosbag2_py.
Using rosbag2_cpp library.
// infrastructure/ros2/bag/bag_recorder.hpp
#pragma once
#include <rclcpp/rclcpp.hpp>
#include <rosbag2_cpp/writer.hpp>
#include <rosbag2_storage/storage_options.hpp>
namespace infrastructure::ros2::bag {
class BagRecorder {
public:
explicit BagRecorder(rclcpp::Node::SharedPtr node) : node_(node) {}
bool start_recording(const std::string& bag_name, const std::vector<std::string>& topics) {
try {
writer_ = std::make_unique<rosbag2_cpp::Writer>();
rosbag2_storage::StorageOptions storage_options;
storage_options.uri = bag_name;
storage_options.storage_id = "sqlite3";
rosbag2_cpp::ConverterOptions converter_options;
converter_options.input_serialization_format = "cdr";
converter_options.output_serialization_format = "cdr";
writer_->open(storage_options, converter_options);
// Subscribe to topics and write
for (const auto& topic : topics) {
// Determine topic type dynamically or use GenericSubscribe (Humble+)
create_recorder_subscription(topic);
}
is_recording_ = true;
return true;
} catch (const std::exception& e) {
RCLCPP_ERROR(node_->get_logger(), "Failed to open bag: %s", e.what());
return false;
}
}
void stop_recording() {
writer_.reset(); // Closes the bag
subs_.clear();
is_recording_ = false;
}
template<typename T>
void write_message(const std::string& topic, const T& msg) {
if (is_recording_ && writer_) {
auto timestamp = node_->get_clock()->now();
writer_->write(msg, topic, timestamp);
}
}
private:
rclcpp::Node::SharedPtr node_;
std::unique_ptr<rosbag2_cpp::Writer> writer_;
std::vector<rclcpp::SubscriptionBase::SharedPtr> subs_;
bool is_recording_ = false;
void create_recorder_subscription(const std::string& topic) {
// Implementation for generic subscription...
}
};
} // namespace
// infrastructure/ros2/bag/bag_reader.hpp
#include <rosbag2_cpp/reader.hpp>
class BagReader {
public:
explicit BagReader(const std::string& bag_path) {
reader_ = std::make_unique<rosbag2_cpp::Reader>();
reader_->open(bag_path);
}
void read_all() {
while (reader_->has_next()) {
auto bag_message = reader_->read_next();
// Deserialization logic...
}
}
private:
std::unique_ptr<rosbag2_cpp::Reader> reader_;
};
sqlite3 is default, mcap is recommended for performance.cdr.