I am currently working on a RRT* path planning homework and I can't able to publish speeds to /cmd_vel for turtlesim3 gazebo. I can see the nodes are connected in rqt_graph but when I echo the topic nothing pops up. I can control the robot using teleop and terminal.I can see the nodes are connected in rqt_graph but when I echo the topic nothing pops up. I can control the robot using teleop and terminal.
#!/usr/bin/env python3
import matplotlib.pyplot as plt
import random
import math
import copy
import rospy
import time
import numpy as np
from numpy import genfromtxt
from numpy import array
from sensor_msgs.msg import NavSatFix
from sensor_msgs.msg import Imu
from std_msgs.msg import String
from geometry_msgs.msg import Vector3Stamped
from geometry_msgs.msg import Twist
from geometry_msgs.msg import Pose2D
from nav_msgs.msg import Path
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import Float32MultiArray
show_animation = True
class RRT():
"""
Class for RRT Planning
"""
def __init__(self, start, goal, obstacles,
randArea, expandDis=1.0, goalSampleRate=5, maxIter = 500):
"""
Setting Parameter
start: Start Position [x, y]
goal: Goal Position [x, y]
obstacles: Obstacle Positions [[x, y, size], ...]
randArea: Random Sampling Area [min, max]
"""
self.start = Node(start[0], start[1])
self.end = Node(goal[0], goal[1])
self.minrand = randArea[0]
self.maxrand = randArea[1]
self.expandDis = expandDis
self.goalSampleRate = goalSampleRate
self.maxIter = maxIter
self.obstacles = obstacles
def Planning(self, animation=True):
"""
Path planning
animation: flag for animation on or off
"""
self.nodeList = [self.start]
while True:
# Random Sampling
if random.randint(0, 100) > self.goalSampleRate:
rnd = [random.uniform(self.minrand, self.maxrand), random.uniform(self.minrand, self.maxrand)]
else:
rnd = [self.end.x, self.end.y]
# Find nearest node
nind = self.GetNearestListIndex(self.nodeList, rnd)
# Expand tree
nearestNode = self.nodeList[nind]
theta = math.atan2(rnd[1] - nearestNode.y, rnd[0] - nearestNode.x)
newNode = copy.deepcopy(nearestNode)
newNode.x += self.expandDis * math.cos(theta)
newNode.y += self.expandDis * math.sin(theta)
newNode.parent = nind
if not self.__CollisionCheck(newNode, self.obstacles):
continue
self.nodeList.append(newNode)
print("Node list length:", len(self.nodeList))
# Check goal
dx = newNode.x - self.end.x
dy = newNode.y - self.end.y
d = math.sqrt(dx * dx + dy * dy)
if d <= self.expandDis:
print("Goal reached!")
break
if animation:
self.DrawGraph(rnd)
path = [[self.end.x, self.end.y]]
lastIndex = len(self.nodeList) - 1
while self.nodeList[lastIndex].parent is not None:
node = self.nodeList[lastIndex]
path.append([node.x, node.y])
lastIndex = node.parent
path.append([self.start.x, self.start.y])
return path
def DrawGraph(self, rnd=None):
"""
Draw Graph
"""
plt.clf()
if rnd is not None:
plt.plot(rnd[0], rnd[1], "^k")
for node in self.nodeList:
if node.parent is not None:
plt.plot([node.x, self.nodeList[node.parent].x], [
node.y, self.nodeList[node.parent].y], "-g")
for (ox, oy, size) in self.obstacles:
plt.plot(ox, oy, "ok", ms=30 * size)
plt.plot(self.start.x, self.start.y, "xr")
plt.plot(self.end.x, self.end.y, "xr")
plt.axis([-10, 10, -10, 10])
plt.grid(True)
plt.pause(0.01)
def GetNearestListIndex(self, nodeList, rnd):
dlist = [(node.x - rnd[0]) ** 2 + (node.y - rnd[1]) ** 2 for node in nodeList]
minind = dlist.index(min(dlist))
return minind
def __CollisionCheck(self, node, obstacles):
for (ox, oy, size) in obstacles:
dx = ox - node.x
dy = oy - node.y
d = math.sqrt(dx * dx + dy * dy)
if d <= size:
return False # Collision
return True # Safe
class Node():
"""
RRT Node
"""
def __init__(self, x, y):
self.x = x
self.y = y
self.parent = None
# Variable to store robot's current position
current_position = [0, 0] # Initial position of the robot
def topic2_callback(msg):
"""
Callback function for receiving robot's position from /topic2
This function updates the robot's position with data from Float32MultiArray.
"""
global current_position
# Assuming msg.data contains the position as [x, y]
current_position = msg.data
print(f"Received robot position: {current_position}")
def goal_pose_callback(msg):
"""
Callback function for receiving goal pose via /goal_pose topic.
"""
print(f"Received new goal position: ({msg.x}, {msg.y})")
start_position = current_position[:2] # Start position from current robot position
rand_area = [-15, 15] # Define the random sampling area for the planner
# Load obstacles from file
f = open("/home/ercan/catkin_ws/src/localization_ekf/obstacles.txt", "r")
obstacles = []
for i in range(8):
x = int(f.readline())
y = int(f.readline())
cap = int(f.readline())
obstacles.append((int(x), int(y), int(cap)))
f.close()
# Plan the path using RRT
rrt = RRT(start=start_position, goal=[msg.x, msg.y], randArea=rand_area, obstacles=obstacles)
path = rrt.Planning(animation=show_animation)
# Draw the final path
if show_animation:
rrt.DrawGraph()
plt.plot([x for (x, y) in path], [y for (x, y) in path], '-r')
plt.grid(True)
plt.show()
# Return the path
follow_path(path, pub, rate)
def follow_path(path, pub, rate):
"""
Function to follow the generated path using a simple control approach
"""
# Robot speed and control parameters
linear_speed = 0.5
angular_speed = 1.0
for i in range(1, len(path)):
x1, y1 = path[i - 1]
x2, y2 = path[i]
# Calculate the distance and angle to the next waypoint
distance = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
angle = math.atan2(y2 - y1, x2 - x1)
# Initialize the Twist message
twist = Twist()
while distance > 0.1: # Continue moving until the robot reaches the waypoint
# Calculate the angle difference
angle_to_goal = math.atan2(y2 - current_position[1], x2 - current_position[0])
angle_diff = angle_to_goal - current_position[2]
# Normalize the angle difference to the range [-pi, pi]
angle_diff = (angle_diff + math.pi) % (2 * math.pi) - math.pi
# Adjust velocities based on distance and angle difference
if abs(angle_diff) > 0.1:
twist.linear.x = 0.0 # Stop moving forward if we need to rotate
twist.angular.z = angular_speed * angle_diff # Rotate towards the goal
else:
twist.linear.x = linear_speed # Move forward
twist.angular.z = 0.0 # Stop rotating
pub.publish(twist)
rate.sleep()
# Update current position (for simulation purposes, assuming we update it here)
current_position[0] += twist.linear.x * 0.1 # Update x based on linear speed
current_position[1] += twist.linear.x * 0.1 # Update y based on linear speed
distance = math.sqrt((x2 - current_position[0]) ** 2 + (y2 - current_position[1]) ** 2)
twist.linear.x = 0
twist.angular.z = 0
pub.publish(twist) # Stop the robot when finished
def main():
# Initialize the ROS node
rospy.init_node('motion_planning_node')
# Create a publisher to send /cmd_vel messages
pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
rate = rospy.Rate(10) # 10hz
# Print waiting message before goal input
print("Waiting for goal position input...")
# Subscribe to the /goal_pose topic to get the goal position
rospy.Subscriber('/goal_pose', Pose2D, goal_pose_callback)
# Subscribe to the /topic2 topic to get the robot's current position
rospy.Subscriber('/topic2', Float32MultiArray, topic2_callback)
rospy.spin()
if __name__ == '__main__':
main()
I think the problem is caused by placement or order of the codes but I won't able to find a solution. Thanks
I am currently working on a RRT* path planning homework and I can't able to publish speeds to /cmd_vel for turtlesim3 gazebo. I can see the nodes are connected in rqt_graph but when I echo the topic nothing pops up. I can control the robot using teleop and terminal.I can see the nodes are connected in rqt_graph but when I echo the topic nothing pops up. I can control the robot using teleop and terminal.
#!/usr/bin/env python3
import matplotlib.pyplot as plt
import random
import math
import copy
import rospy
import time
import numpy as np
from numpy import genfromtxt
from numpy import array
from sensor_msgs.msg import NavSatFix
from sensor_msgs.msg import Imu
from std_msgs.msg import String
from geometry_msgs.msg import Vector3Stamped
from geometry_msgs.msg import Twist
from geometry_msgs.msg import Pose2D
from nav_msgs.msg import Path
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import Float32MultiArray
show_animation = True
class RRT():
"""
Class for RRT Planning
"""
def __init__(self, start, goal, obstacles,
randArea, expandDis=1.0, goalSampleRate=5, maxIter = 500):
"""
Setting Parameter
start: Start Position [x, y]
goal: Goal Position [x, y]
obstacles: Obstacle Positions [[x, y, size], ...]
randArea: Random Sampling Area [min, max]
"""
self.start = Node(start[0], start[1])
self.end = Node(goal[0], goal[1])
self.minrand = randArea[0]
self.maxrand = randArea[1]
self.expandDis = expandDis
self.goalSampleRate = goalSampleRate
self.maxIter = maxIter
self.obstacles = obstacles
def Planning(self, animation=True):
"""
Path planning
animation: flag for animation on or off
"""
self.nodeList = [self.start]
while True:
# Random Sampling
if random.randint(0, 100) > self.goalSampleRate:
rnd = [random.uniform(self.minrand, self.maxrand), random.uniform(self.minrand, self.maxrand)]
else:
rnd = [self.end.x, self.end.y]
# Find nearest node
nind = self.GetNearestListIndex(self.nodeList, rnd)
# Expand tree
nearestNode = self.nodeList[nind]
theta = math.atan2(rnd[1] - nearestNode.y, rnd[0] - nearestNode.x)
newNode = copy.deepcopy(nearestNode)
newNode.x += self.expandDis * math.cos(theta)
newNode.y += self.expandDis * math.sin(theta)
newNode.parent = nind
if not self.__CollisionCheck(newNode, self.obstacles):
continue
self.nodeList.append(newNode)
print("Node list length:", len(self.nodeList))
# Check goal
dx = newNode.x - self.end.x
dy = newNode.y - self.end.y
d = math.sqrt(dx * dx + dy * dy)
if d <= self.expandDis:
print("Goal reached!")
break
if animation:
self.DrawGraph(rnd)
path = [[self.end.x, self.end.y]]
lastIndex = len(self.nodeList) - 1
while self.nodeList[lastIndex].parent is not None:
node = self.nodeList[lastIndex]
path.append([node.x, node.y])
lastIndex = node.parent
path.append([self.start.x, self.start.y])
return path
def DrawGraph(self, rnd=None):
"""
Draw Graph
"""
plt.clf()
if rnd is not None:
plt.plot(rnd[0], rnd[1], "^k")
for node in self.nodeList:
if node.parent is not None:
plt.plot([node.x, self.nodeList[node.parent].x], [
node.y, self.nodeList[node.parent].y], "-g")
for (ox, oy, size) in self.obstacles:
plt.plot(ox, oy, "ok", ms=30 * size)
plt.plot(self.start.x, self.start.y, "xr")
plt.plot(self.end.x, self.end.y, "xr")
plt.axis([-10, 10, -10, 10])
plt.grid(True)
plt.pause(0.01)
def GetNearestListIndex(self, nodeList, rnd):
dlist = [(node.x - rnd[0]) ** 2 + (node.y - rnd[1]) ** 2 for node in nodeList]
minind = dlist.index(min(dlist))
return minind
def __CollisionCheck(self, node, obstacles):
for (ox, oy, size) in obstacles:
dx = ox - node.x
dy = oy - node.y
d = math.sqrt(dx * dx + dy * dy)
if d <= size:
return False # Collision
return True # Safe
class Node():
"""
RRT Node
"""
def __init__(self, x, y):
self.x = x
self.y = y
self.parent = None
# Variable to store robot's current position
current_position = [0, 0] # Initial position of the robot
def topic2_callback(msg):
"""
Callback function for receiving robot's position from /topic2
This function updates the robot's position with data from Float32MultiArray.
"""
global current_position
# Assuming msg.data contains the position as [x, y]
current_position = msg.data
print(f"Received robot position: {current_position}")
def goal_pose_callback(msg):
"""
Callback function for receiving goal pose via /goal_pose topic.
"""
print(f"Received new goal position: ({msg.x}, {msg.y})")
start_position = current_position[:2] # Start position from current robot position
rand_area = [-15, 15] # Define the random sampling area for the planner
# Load obstacles from file
f = open("/home/ercan/catkin_ws/src/localization_ekf/obstacles.txt", "r")
obstacles = []
for i in range(8):
x = int(f.readline())
y = int(f.readline())
cap = int(f.readline())
obstacles.append((int(x), int(y), int(cap)))
f.close()
# Plan the path using RRT
rrt = RRT(start=start_position, goal=[msg.x, msg.y], randArea=rand_area, obstacles=obstacles)
path = rrt.Planning(animation=show_animation)
# Draw the final path
if show_animation:
rrt.DrawGraph()
plt.plot([x for (x, y) in path], [y for (x, y) in path], '-r')
plt.grid(True)
plt.show()
# Return the path
follow_path(path, pub, rate)
def follow_path(path, pub, rate):
"""
Function to follow the generated path using a simple control approach
"""
# Robot speed and control parameters
linear_speed = 0.5
angular_speed = 1.0
for i in range(1, len(path)):
x1, y1 = path[i - 1]
x2, y2 = path[i]
# Calculate the distance and angle to the next waypoint
distance = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
angle = math.atan2(y2 - y1, x2 - x1)
# Initialize the Twist message
twist = Twist()
while distance > 0.1: # Continue moving until the robot reaches the waypoint
# Calculate the angle difference
angle_to_goal = math.atan2(y2 - current_position[1], x2 - current_position[0])
angle_diff = angle_to_goal - current_position[2]
# Normalize the angle difference to the range [-pi, pi]
angle_diff = (angle_diff + math.pi) % (2 * math.pi) - math.pi
# Adjust velocities based on distance and angle difference
if abs(angle_diff) > 0.1:
twist.linear.x = 0.0 # Stop moving forward if we need to rotate
twist.angular.z = angular_speed * angle_diff # Rotate towards the goal
else:
twist.linear.x = linear_speed # Move forward
twist.angular.z = 0.0 # Stop rotating
pub.publish(twist)
rate.sleep()
# Update current position (for simulation purposes, assuming we update it here)
current_position[0] += twist.linear.x * 0.1 # Update x based on linear speed
current_position[1] += twist.linear.x * 0.1 # Update y based on linear speed
distance = math.sqrt((x2 - current_position[0]) ** 2 + (y2 - current_position[1]) ** 2)
twist.linear.x = 0
twist.angular.z = 0
pub.publish(twist) # Stop the robot when finished
def main():
# Initialize the ROS node
rospy.init_node('motion_planning_node')
# Create a publisher to send /cmd_vel messages
pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
rate = rospy.Rate(10) # 10hz
# Print waiting message before goal input
print("Waiting for goal position input...")
# Subscribe to the /goal_pose topic to get the goal position
rospy.Subscriber('/goal_pose', Pose2D, goal_pose_callback)
# Subscribe to the /topic2 topic to get the robot's current position
rospy.Subscriber('/topic2', Float32MultiArray, topic2_callback)
rospy.spin()
if __name__ == '__main__':
main()
I think the problem is caused by placement or order of the codes but I won't able to find a solution. Thanks
Share Improve this question asked Jan 18 at 16:43 Ercan İnanErcan İnan 11 bronze badge1 Answer
Reset to default 0This method of using variables does not work, and in fact will cause errors. If you're doing it this way your pub
should be a global variable at the top of your file. Then it should not be passed as a function arg and just used directly in your following function:
#!/usr/bin/env python3
import matplotlib.pyplot as plt
import random
import math
import copy
import rospy
import time
import numpy as np
from numpy import genfromtxt
from numpy import array
from sensor_msgs.msg import NavSatFix
from sensor_msgs.msg import Imu
from std_msgs.msg import String
from geometry_msgs.msg import Vector3Stamped
from geometry_msgs.msg import Twist
from geometry_msgs.msg import Pose2D
from nav_msgs.msg import Path
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import Float32MultiArray
show_animation = True
pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
class RRT():
....
The other issue is that this will cause a runtime error. So if the above node does not die it means you have another issue. Being either your /goal_pose
topic is not being published to or the messages being published to it are the same as your current position.