본문 바로가기
ROS2 강의 (Humble)

[ROS 2 강의] Topic Programming

by 바위폭주 2025. 6. 29.
728x90
반응형

이번 시간에는 ROS 2에서 토픽으로 메시지를 발행(Publish)하는 노드를 직접 만들어보겠습니다. 이전 시간에 ros2 topic pub 명령어로 토픽 메시지를 전송해봤다면, 이번에는 그 과정을 파이썬 코드로 자동화하는 실습입니다.


1. 패키지 생성 및 의존성 추가

먼저 토픽 퍼블리셔 노드를 만들기 위한 새 패키지를 생성합니다.

ros2 pkg create --build-type ament_python publisher_pkg --dependencies rclpy std_msgs geometry_msgs
  • rclpy: ROS 2 Python 클라이언트 라이브러리
  • std_msgs, geometry_msgs: 퍼블리셔에서 사용할 메시지 인터페이스

2. 퍼블리셔 노드 파일 작성

publisher_pkg 디렉터리 내에 simple_publisher.py라는 파이썬 파일을 새로 생성하고, 퍼블리셔 노드를 작성합니다. 이 노드는 0.5초마다 /cmd_vel 토픽에 Twist 메시지를 보내는 기능을 합니다.

import rclpy
# import the ROS2 python libraries
from rclpy.node import Node
# import the Twist interface from the geometry_msgs package
from geometry_msgs.msg import Twist

class SimplePublisher(Node):

    def __init__(self):
        # Here you have the class constructor
        # call super() in the constructor to initialize the Node object
        # the parameter you pass is the node name
        super().__init__('simple_publisher')
        # create the publisher object
        # in this case, the publisher will publish on /cmd_vel Topic with a queue size of 10 messages.
        # use the Twist module for /cmd_vel Topic
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
        # define the timer period for 0.5 seconds
        timer_period = 0.5
        # create a timer sending two parameters:
        # - the duration between 2 callbacks (0.5 seconds)
        # - the timer function (timer_callback)
        self.timer = self.create_timer(timer_period, self.timer_callback)

    def timer_callback(self):
        # Here you have the callback method
        # create a Twist message
        msg = Twist()
        # define the linear x-axis velocity of /cmd_vel Topic parameter to 0.5
        msg.linear.x = 0.5
        # define the angular z-axis velocity of /cmd_vel Topic parameter to 0.5
        msg.angular.z = 0.5
        # Publish the message to the Topic
        self.publisher_.publish(msg)
        # Display the message on the console
        self.get_logger().info('Publishing: "%s"' % msg)

def main(args=None):
    # initialize the ROS communication
    rclpy.init(args=args)
    # declare the node constructor
    simple_publisher = SimplePublisher()
    # pause the program execution, waits for a request to kill the node (ctrl+c)
    rclpy.spin(simple_publisher)
    # Explicity destroys the node
    simple_publisher.destroy_node()
    # shutdown the ROS communication
    rclpy.shutdown()

if __name__ == '__main__':
    main()
  • /cmd_vel은 로봇 제어를 위한 속도 명령 토픽입니다.
  • Twist 메시지는 선형(linear)과 각속도(angular)를 포함합니다.

3. 런치 파일 생성

작성한 퍼블리셔 노드를 실행하기 위한 런치 파일( publisher_pkg_launch_file.launch.py )을 launch 디렉터리 안에 생성합니다. 

cd ~/ros2_ws/src/publisher_pkg
mkdir launch
cd ~/ros2_ws/src/publisher_pkg/launch
touch publisher_pkg_launch_file.launch.py
chmod +x publisher_pkg_launch_file.launch.py

해당 파일에는 Node 클래스를 사용해 우리가 만든 simple_publisher.py를 실행시키는 코드가 들어갑니다.

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='publisher_pkg',
            executable='simple_publisher',
            output='screen'),
    ])

 


4. setup.py 수정

setup.py 파일의 entry_points에 simple_publisher.py를 실행 파일로 등록합니다. 그래야 런치 파일이나 CLI 명령어로 실행할 수 있습니다.

from setuptools import setup
import os
from glob import glob

package_name = 'publisher_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name), glob('launch/*.launch.py'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='user',
    maintainer_email='user@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'simple_publisher = publisher_pkg.simple_publisher:main'
        ],
    },
)

5. 패키지 빌드 및 실행

이제 전체 패키지를 빌드하고, 런치 파일을 실행하면 퍼블리셔 노드가 작동하게 됩니다.

cd ~/ros2_ws
colcon build --symlink-install --packages-select publisher_pkg
source ~/ros2_ws/install/local_setup.bash
ros2 launch publisher_pkg publisher_pkg_launch_file.launch.py

6. 메시지 확인 및 정지

노드가 메시지를 잘 발행하고 있는지 확인하려면 다음 명령어를 실행합니다.

ros2 topic echo /cmd_vel

정지시키고 싶다면 런치 터미널에서 Ctrl+C로 노드를 종료한 뒤, 다음 명령으로 로봇을 멈출 수 있습니다.

ros2 topic pub --once /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}"

코드 해설

simple_publisher.py

  • 노드 초기화
super().__init__('simple_publisher')
  • 퍼블리셔 생성
self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
  • 타이머 설정 (0.5초 간격)
self.timer = self.create_timer(timer_period, self.timer_callback)
  • 메시지 발행
self.publisher_.publish(msg)

이번 실습에서는 ROS 2에서 노드를 만들고, 토픽으로 메시지를 자동 발행하는 과정을 배워봤습니다. 이런 방식을 통해 로봇 제어를 반복적이고 안정적으로 수행할 수 있습니다. 다음 시간에는 서브스크라이버 노드를 구현해보며 토픽 수신에 대해 배워보겠습니다.

궁금한 점이 있거나 실습 중 막히는 부분이 있다면 언제든지 댓글로 남겨주세요!

 

여기까지 잘 따라오셨을 까요? 강의를 따라 하고 계신 분이 있다면, 꼭 댓글을 남겨주세요.
여러분의 댓글이 저에게 큰 동기부여가 됩니다.
강의를 따라 하면서 궁금한 점이나 막히는 부분이 있다면 언제든지 자유롭게 질문 남겨주세요!
728x90
반응형