DEV Community

Cover image for RADU: Processing & Interpreting ROS Movement Messages with Python
Sebastian
Sebastian

Posted on

RADU: Processing & Interpreting ROS Movement Messages with Python

When using the Robot Operating System, nodes are started, topics published, messages send. Internally, the ROS nodes use these messages to communicate with each other. Now, if you build a robot that uses ROS as the communication middleware, you need to write custom code which will parse these messages and transform them to meaningful commands for your robot.

This article is a step-by-step tutorial about making your robot move based on ROS messages with the Twist message format. First, we will start ROS nodes and investigate the Twist messages that they exchange. Second, we need to decide the best approach and format to handle convert this data into a form that the robot understands. Finally, we implement the logic in the robot.

The technical context of this article is Ubuntu 20.04 LTS with latest ros-noetic and Python 3.8.10

This article originally appeared at my blog admantium.com.

Starting ROS

For the start, we need three different terminals.

In the first terminal, start roscore.

$> roscore
... logging to /home/devcon/.ros/log/6c0f6dc0-e33f-11eb-9a51-737b11e35b15/roslaunch-giga-442540.log
Checking log directory for disk usage. This may take a while.
Press Ctrl-C to interrupt
Done checking log file disk usage. Usage is <1GB.

started roslaunch server http://giga:45125/
ros_comm version 1.15.11


SUMMARY
========

PARAMETERS
 * /rosdistro: noetic
 * /rosversion: 1.15.11

NODES

auto-starting new master
process[master]: started with pid [442548]
ROS_MASTER_URI=http://giga:11311/

setting /run_id to 6c0f6dc0-e33f-11eb-9a51-737b11e35b15
process[rosout-1]: started with pid [442558]
started core service [/rosout]
Enter fullscreen mode Exit fullscreen mode

Then in the second, start the teleop-twist-keyboard. With this node, you can use the keyboard to generate messages.

$> rosrun teleop_twist_keyboard teleop_twist_keyboard.py

Reading from the keyboard  and Publishing to Twist!
---------------------------
Moving around:
   u    i    o
   j    k    l
   m    ,    .

For Holonomic mode (strafing), hold down the shift key:
---------------------------
   U    I    O
   J    K    L
   M    <    >

t : up (+z)
b : down (-z)

anything else : stop

q/z : increase/decrease max speeds by 10%
w/x : increase/decrease only linear speed by 10%
e/c : increase/decrease only angular speed by 10%

CTRL-C to quit

currently: speed 0.5 turn 1.0 
...
currently: speed 0.5 turn 0.47351393100000017 
Enter fullscreen mode Exit fullscreen mode

But where are these messages generated? Let’s explore the available ROS topics.

$> rostopic list
/rosout
/rosout_agg
/cmd_vel
/color_sensor
/pose
Enter fullscreen mode Exit fullscreen mode

The messages are published in the topic /cmd_vel.

Twist Message Format

We can check the data format of the topic /cmd_vel.

$> rostopic type /cmd_vel
geometry_msgs/Twist

$> rosmsg show geometry_msgs/Twist
geometry_msgs/Vector3 linear
  float64 x
  float64 y
  float64 z
geometry_msgs/Vector3 angular
  float64 x
  float64 y
  float64 z
Enter fullscreen mode Exit fullscreen mode

In addition to the command line, we can also use the graphical tool rqt.

But what is the meaning of this message?

TWIST Message Format Explained

The twist message format consists of two parts. In its linear component, the planar directions for the three dimensions x, y, and z are defines as float values. Think of this as the speed your robot should drive in each direction. For example, the following command shows that a robot should move with the speed of 2 in the direction of x.

linear: 
  x: 2.0
  y: 0.0
  z: 0.0
Enter fullscreen mode Exit fullscreen mode

The angular component defines the change in orientation of a movement, for the three coordinates x, y, and z, expressed as radians per second. The coordinates represent the yaw, pitch and roll axis. This means that the z value describes a change in the 2d plane.

angular: 
  x: 0.0
  y: 0.0
  z: 1.5
Enter fullscreen mode Exit fullscreen mode

The unit radians means: Given a circle with radius r, a radian is the distance r along the circles border.

ROS: Twist Message Provider

Understanding the data format, lets define a ROS node that subscribes to this message.

First, we create a new ROS package.

catkin_create_pkg teleop_listener twist rospy
Enter fullscreen mode Exit fullscreen mode

Then, create the file scripts/listener.py. This script will create a ROS node that subscribes to the topic /cmd_vel. For simplicity, we will just print the received messages.

# scripts/listener.py
import rospy
import time
from geometry_msgs.msg import Twist


def callback(twist_msg):
  rospy.loginfo(rospy.get_caller_id() + "Received Data")
  rospy.loginfo('Sending ' + repr(twist_msg))

def listener():
  rospy.init_node('twist_listener')
  rospy.Subscriber('/cmd_vel', Twist, callback)
  rospy.loginfo('Twist Listener started')
  rospy.spin()

if __name__ == '__main__':
  listener()
Enter fullscreen mode Exit fullscreen mode

Starting this program, we can see that the twist messages are successfully parsed.

[INFO] [1627208023.888022]: Twist Listener started
[INFO] [1627208028.010496]: /twist_listenerReceived Data
[INFO] [1627208028.014720]: Sending linear: 
  x: 2.0
  y: 0.0
  z: 0.0
angular: 
  x: 0.0
  y: 0.0
  z: 0.0
Enter fullscreen mode Exit fullscreen mode

Now we need to think about how this format will be interpreted by our robot.

Twist Message Format Wrapper on the Host Computer

The received messages have only two values that are meaningful for a planar, wheeled robot. The linear x value determines the forward or backward direction, and the angular z determines the amount of turns that the robot needs to make.

As documented in my earlier articles best practices for handling serial data, the most performant data formats that the motor controller receives is either bit fields or texts. Texts have the additional benefit for being more extensive than bit fields, and therefore we will use them.

The scripts/listener.py will extract the linear and angular value, and create a text message in the form TWIST_MSG:(MOVE=2.0, TURN=0.0). Here is the code:

# scripts/listener_refined.py
import rospy
import time
from geometry_msgs.msg import Twist

def callback(twist_msg):
  rospy.loginfo(rospy.get_caller_id() + "Received Data")
  radu_msg = repr(f'TWIST_MSG:(MOVE={twist_msg.linear.x}, TURN={twist_msg.angular.z})')
  rospy.loginfo('Sending <<' + radu_msg + '>>')
  ser.write((radu_msg + "\n").encode('utf-8'))

def listener():
  rospy.init_node('radu_mk1')
  rospy.Subscriber('/cmd_vel', Twist, callback)
  rospy.spin()

if __name__ == '__main__':
  listener()
Enter fullscreen mode Exit fullscreen mode

Twist Message Format Gateway on the Microcontroller

The received message seems simple, but we cannot parse it straight ahead. The values for MOVE and TURN need to translated to concrete commands of your robot. This involved consideration such as the speed values, the wheel diameter, the turn radius, and much more. I highly recommend this great blog article about motor controller that summarizes the essential math. In essence: Movement and turn commands are speed values, and these needs to be converted to duty cycles of PWM signals send to the motors.

Handling messages and transforming them to the internal format is responsibility of a class that I termed MessageGateway. This class is instantiated with a concrete serial connection interface, then actively polls the interface for new messages, and of available, parses and translates them

Let’s develop the gateway step-by-step.

The main.py class create an instance of MessageGateway with a serial connection object.

#robot/radu/message_gateway.python
class MessageGateway():
  def __init__(self, serial_obj):
    self._serial = serial_obj
    self.speed_limits= limits['speed']

    self.twist_matcher = ure.compile('TWIST_MSG')
    self.move_matcher = ure.compile('MOVE=(-?\d+.\d+)')
    self.turn_matcher = ure.compile('TURN=(-?\d+.\d+)')
Enter fullscreen mode Exit fullscreen mode

Once created, main.py will regularly call gateway.poll()

while True:  
  obj = gateway.poll()

  if obj:
      robot.process(obj)

  sleep_ms(5)
Enter fullscreen mode Exit fullscreen mode

The poll() method checks if any data from its serial connection exists...

 def poll(self):
    msg = self._serial.read()
    if msg:
      commands = self.parse(msg)
      return commands

    return None
Enter fullscreen mode Exit fullscreen mode

... and if yes, will call the parse() method. In this method, the following steps happen:

  • Check if the message is a TWIST message (by using regular expressions)
  • For the MOVE and TURN commands, extract the float value (by using regular expressions
  • return a list of tuples with the commands
  def parse2(self, msg):
    try:
      if self.twist_matcher.search(msg):
        move_string = self.move_matcher.search(msg).groups()
        move_value = float(move_string[0])

        turn_string = self.turn_matcher.search(msg).groups()
        turn_value = float(turn_string[0])

        result = [('MOVE', move_value), ('TURN', turn_value)]
        print('PARSE2', result)

        return({'TWIST_MSG': result})
    except Exception as e:
      print('MessageGateway error', msg, e)
    return None
Enter fullscreen mode Exit fullscreen mode

Motor Controller on the Microcontroller

The next step is to process the movements commands inside main.py. The following method parses the command tuple and saves the movement values which are float values. From this, it calculates the duty cycle for both wheels, and clips the values so they confirm with the motor (in my case: a minimum of 49000, and a maximum of 65000). Then it sets the duty cycle on the servo motors.

def process_twist_msg(self, commands):
  linear = angular = 0

  for tuple in commands:
    key, value = tuple
    if key == 'MOVE':
      linear = value
    if key == 'TURN':
      angular = value

  m1_duty_percent = (linear - (angular * WHEEL_DISTANCE))*2
  m2_duty_percent = (linear + (angular * WHEEL_DISTANCE))*2

  duty_cycle1 = self.clip(m1_duty_percent) 
  duty_cycle2 = self.clip(m2_duty_percent)

  self.logger.status("M1: {} dty".format(duty_cycle1))

  self.m1.move2(duty_cycle1)
  self.m2.move2(duty_cycle2)
Enter fullscreen mode Exit fullscreen mode

Conclusion

When connecting your robot to ROS, you need to parse the ROS specific messages to your robot. In this article, I showed how to translate the ROS Twist message format, which are movement information, to a format that the Robot can understand. The design space for this problem is huge. First, you need to decide how much interpretation of the format you are doing on the receiving ROS node, and how much the microcontroller to which the message is passed needs to do. Second, you decide the data format. The solution shown here evolved: While I was first passing complete Python objects, performance tests showed that working with texts is way faster. Third, you need to divide how to process the message inside your robot. For this, I adopted the Gateway pattern: A central entity reads, parses, transforms the messages, and applies conversion (e.g. set appropriates speed limits). If your application gets complex, and when you need to send even more messages, then having a central Gateway class that encapsulate all of these steps is essential. Fourth, you need to transform the obtained values into concrete motor controller commands. In my case, this boils down to duty cycles of PWM, clipped into the possible minimum and maximum values.

Top comments (0)