RADU: Processing & Interpreting ROS Movement Messages with Python

Starting ROS

$> roscore
... logging to /home/devcon/.ros/log/6c0f6dc0-e33f-11eb-9a51-737b11e35b15/roslaunch-giga-442540.log
Checking log directory for disk usage. This may take a while.
Press Ctrl-C to interrupt
Done checking log file disk usage. Usage is <1GB.
started roslaunch server http://giga:45125/
ros_comm version 1.15.11
SUMMARY
========
PARAMETERS
* /rosdistro: noetic
* /rosversion: 1.15.11
NODESauto-starting new master
process[master]: started with pid [442548]
ROS_MASTER_URI=http://giga:11311/
setting /run_id to 6c0f6dc0-e33f-11eb-9a51-737b11e35b15
process[rosout-1]: started with pid [442558]
started core service [/rosout]
$> rosrun teleop_twist_keyboard teleop_twist_keyboard.pyReading from the keyboard  and Publishing to Twist!
---------------------------
Moving around:
u i o
j k l
m , .
For Holonomic mode (strafing), hold down the shift key:
---------------------------
U I O
J K L
M < >
t : up (+z)
b : down (-z)
anything else : stopq/z : increase/decrease max speeds by 10%
w/x : increase/decrease only linear speed by 10%
e/c : increase/decrease only angular speed by 10%
CTRL-C to quitcurrently: speed 0.5 turn 1.0
...
currently: speed 0.5 turn 0.47351393100000017
$> rostopic list
/rosout
/rosout_agg
/cmd_vel
/color_sensor
/pose

Twist Message Format

$> rostopic type /cmd_vel
geometry_msgs/Twist
$> rosmsg show geometry_msgs/Twist
geometry_msgs/Vector3 linear
float64 x
float64 y
float64 z
geometry_msgs/Vector3 angular
float64 x
float64 y
float64 z

TWIST Message Format Explained

linear: 
x: 2.0
y: 0.0
z: 0.0
angular: 
x: 0.0
y: 0.0
z: 1.5

ROS: Twist Message Provider

catkin_create_pkg teleop_listener twist rospy
# scripts/listener.py
import rospy
import time
from geometry_msgs.msg import Twist
def callback(twist_msg):
rospy.loginfo(rospy.get_caller_id() + "Received Data")
rospy.loginfo('Sending ' + repr(twist_msg))
def listener():
rospy.init_node('twist_listener')
rospy.Subscriber('/cmd_vel', Twist, callback)
rospy.loginfo('Twist Listener started')
rospy.spin()
if __name__ == '__main__':
listener()
[INFO] [1627208023.888022]: Twist Listener started
[INFO] [1627208028.010496]: /twist_listenerReceived Data
[INFO] [1627208028.014720]: Sending linear:
x: 2.0
y: 0.0
z: 0.0
angular:
x: 0.0
y: 0.0
z: 0.0

Twist Message Format Wrapper on the Host Computer

# scripts/listener_refined.py
import rospy
import time
from geometry_msgs.msg import Twist
def callback(twist_msg):
rospy.loginfo(rospy.get_caller_id() + "Received Data")
radu_msg = repr(f'TWIST_MSG:(MOVE={twist_msg.linear.x}, TURN={twist_msg.angular.z})')
rospy.loginfo('Sending <<' + radu_msg + '>>')
ser.write((radu_msg + "\n").encode('utf-8'))
def listener():
rospy.init_node('radu_mk1')
rospy.Subscriber('/cmd_vel', Twist, callback)
rospy.spin()
if __name__ == '__main__':
listener()

Twist Message Format Gateway on the Microcontroller

#robot/radu/message_gateway.python
class MessageGateway():
def __init__(self, serial_obj):
self._serial = serial_obj
self.speed_limits= limits['speed']
self.twist_matcher = ure.compile('TWIST_MSG')
self.move_matcher = ure.compile('MOVE=(-?\d+.\d+)')
self.turn_matcher = ure.compile('TURN=(-?\d+.\d+)')
while True:  
obj = gateway.poll()
if obj:
robot.process(obj)
sleep_ms(5)
def poll(self):
msg = self._serial.read()
if msg:
commands = self.parse(msg)
return commands
return None
  • Check if the message is a TWIST message (by using regular expressions)
  • For the MOVE and TURN commands, extract the float value (by using regular expressions
  • return a list of tuples with the commands
def parse2(self, msg):
try:
if self.twist_matcher.search(msg):
move_string = self.move_matcher.search(msg).groups()
move_value = float(move_string[0])
turn_string = self.turn_matcher.search(msg).groups()
turn_value = float(turn_string[0])
result = [('MOVE', move_value), ('TURN', turn_value)]
print('PARSE2', result)
return({'TWIST_MSG': result})
except Exception as e:
print('MessageGateway error', msg, e)
return None

Motor Controller on the Microcontroller

def process_twist_msg(self, commands):
linear = angular = 0
for tuple in commands:
key, value = tuple
if key == 'MOVE':
linear = value
if key == 'TURN':
angular = value
m1_duty_percent = (linear - (angular * WHEEL_DISTANCE))*2
m2_duty_percent = (linear + (angular * WHEEL_DISTANCE))*2
duty_cycle1 = self.clip(m1_duty_percent)
duty_cycle2 = self.clip(m2_duty_percent)
self.logger.status("M1: {} dty".format(duty_cycle1)) self.m1.move2(duty_cycle1)
self.m2.move2(duty_cycle2)

Conclusion

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Sebastian

Sebastian

IT Project Manager & Developer