| name | ROS2 Node Creation |
| description | Guide for creating ROS2 nodes following Clean Architecture principles (Python & C++) |
ROS2 Node Creation Skill
This skill is used to create ROS2 nodes that adhere to Clean Architecture principles. It covers both Python and C++ implementations.
Directory Structure
src/
āāā domain/ # Business Logic Layer
ā āāā entities/ # Core business objects
ā āāā repositories/ # Repository interfaces (abstract)
ā āāā use_cases/ # Business rules
āāā application/ # Application Layer
ā āāā services/ # Application services
ā āāā interfaces/ # Port interfaces
āāā infrastructure/ # Infrastructure Layer
āāā ros2/
āāā nodes/ # ROS2 Node implementations
āāā publishers/ # Publisher adapters
āāā subscribers/ # Subscriber adapters
āāā services/ # Service adapters
Python Implementation
1. Base Node Template
"""
ROS2 Node: [NodeName]
Description: [Node Description]
"""
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from typing import Optional, Callable
from abc import ABC, abstractmethod
class BaseNode(Node, ABC):
"""Base class for all nodes."""
def __init__(self, node_name: str):
super().__init__(node_name)
self._setup_parameters()
self._setup_publishers()
self._setup_subscribers()
self._setup_services()
self._setup_timers()
self.get_logger().info(f'{node_name} initialized')
@abstractmethod
def _setup_parameters(self) -> None:
"""Define ROS2 parameters."""
pass
@abstractmethod
def _setup_publishers(self) -> None:
"""Create publishers."""
pass
@abstractmethod
def _setup_subscribers(self) -> None:
"""Create subscribers."""
pass
def _setup_services(self) -> None:
"""Create services (optional)."""
pass
def _setup_timers(self) -> None:
"""Create timers (optional)."""
pass
def get_default_qos(self) -> QoSProfile:
"""Default QoS profile."""
return QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10
)
2. Concrete Node Implementation
C++ Implementation
1. Base Node Template (Header)
#pragma once
#include <rclcpp/rclcpp.hpp>
#include <string>
#include <memory>
namespace infrastructure::ros2::nodes {
class BaseNode : public rclcpp::Node {
public:
explicit BaseNode(const std::string& node_name,
const rclcpp::NodeOptions& options = rclcpp::NodeOptions());
virtual ~BaseNode() = default;
protected:
virtual void setup_parameters() = 0;
virtual void setup_publishers() = 0;
virtual void setup_subscribers() = 0;
virtual void setup_services() {}
virtual void setup_timers() {}
rclcpp::QoS get_default_qos() const;
};
}
2. Base Node Template (Source)
#include "infrastructure/ros2/nodes/base_node.hpp"
namespace infrastructure::ros2::nodes {
BaseNode::BaseNode(const std::string& node_name, const rclcpp::NodeOptions& options)
: Node(node_name, options) {
}
rclcpp::QoS BaseNode::get_default_qos() const {
return rclcpp::QoS(10)
.reliability(rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_RELIABLE)
.history(rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_KEEP_LAST);
}
}
3. Concrete Node Implementation (C++)
#pragma once
#include "infrastructure/ros2/nodes/base_node.hpp"
#include "application/services/sensor_service.hpp"
#include <std_msgs/msg/float64.hpp>
#include <sensor_msgs/msg/temperature.hpp>
namespace infrastructure::ros2::nodes {
class SensorNode : public BaseNode {
public:
explicit SensorNode(const rclcpp::NodeOptions& options = rclcpp::NodeOptions());
void set_sensor_service(std::shared_ptr<application::services::ISensorService> service);
protected:
void setup_parameters() override;
void setup_publishers() override;
void setup_subscribers() override;
void setup_timers() override;
private:
void raw_callback(const std_msgs::msg::Float64::SharedPtr msg);
void timer_callback();
std::shared_ptr<application::services::ISensorService> sensor_service_;
rclcpp::Publisher<sensor_msgs::msg::Temperature>::SharedPtr temp_pub_;
rclcpp::Subscription<std_msgs::msg::Float64>::SharedPtr raw_sub_;
rclcpp::TimerBase::SharedPtr timer_;
double update_rate_;
std::string sensor_topic_;
};
}
4. Dependency Injection (C++)
#pragma once
#include "domain/entities/sensor_data.hpp"
namespace application::services {
class ISensorService {
public:
virtual ~ISensorService() = default;
virtual domain::entities::SensorData process(double raw_data) = 0;
};
}
QoS Profiles (C++)
#pragma once
#include <rclcpp/qos.hpp>
namespace infrastructure::ros2 {
class QoSProfiles {
public:
static rclcpp::QoS sensor_data() {
return rclcpp::QoS(1).best_effort().durability_volatile();
}
static rclcpp::QoS command() {
return rclcpp::QoS(10).reliable().transient_local();
}
};
}