TB3-源码剖析-4.障碍物检测
import rospy import os from enum import Enum from std_msgs.msg import UInt8, Float64 from sensor_msgs.msg import LaserScan from turtlebot3_autorace_msgs.msg import MovingParam class DetectContruction(): def __init__(self): # subscribes state self.sub_scan_obstacle = rospy.Subscriber('/detect/scan', LaserScan, self.cbScanObstacle, queue_size=1) self.sub_construction_order = rospy.Subscriber('/detect/construction_order', UInt8, self.cbConstructionOrder, queue_size=1) self.sub_moving_completed = rospy.Subscriber('/control/moving/complete', UInt8, self.cbMovingComplete, queue_size = 1) # publishes state self.pub_construction_return = rospy.Publisher('/detect/construction_stamped', UInt8, queue_size=1) self.pub_moving = rospy.Publisher('/control/moving/state', MovingParam, queue_size= 1) self.pub_max_vel = rospy.Publisher('/control/max_vel', Float64, queue_size = 1) self.StepOfConstruction = Enum('StepOfConstruction', 'find_obstacle avoid_obstacle exit') self.is_obstacle_detected = False self.is_moving_complete = False def cbMovingComplete(self, data): self.is_moving_complete = True def cbConstructionOrder(self, order): msg_pub_construction_return = UInt8() if order.data == self.StepOfConstruction.find_obstacle.value: rospy.loginfo("find obstacle") while True: if self.is_obstacle_detected == True: rospy.loginfo("Encounter obstacle") break else: pass msg_pub_max_vel = Float64() msg_pub_max_vel.data = 0.0 self.pub_max_vel.publish(msg_pub_max_vel) rospy.sleep(1) msg_pub_construction_return.data = self.StepOfConstruction.avoid_obstacle.value elif order.data == self.StepOfConstruction.avoid_obstacle.value: rospy.loginfo("avoid obstacle") rospy.loginfo("go left") msg_moving = MovingParam() msg_moving.moving_type=2 msg_moving.moving_value_angular=90 msg_moving.moving_value_linear=0.0 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) rospy.loginfo("go straight") msg_moving.moving_type= 4 msg_moving.moving_value_angular=0.0 msg_moving.moving_value_linear=0.25 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) rospy.loginfo("go right") msg_moving.moving_type=3 msg_moving.moving_value_angular=90 msg_moving.moving_value_linear=0.0 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) rospy.loginfo("go straight") msg_moving.moving_type= 4 msg_moving.moving_value_angular=0.0 msg_moving.moving_value_linear=0.5 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) rospy.loginfo("go right") msg_moving.moving_type=3 msg_moving.moving_value_angular=90 msg_moving.moving_value_linear=0.0 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) rospy.loginfo("go straight") msg_moving.moving_type= 4 msg_moving.moving_value_angular=0.0 msg_moving.moving_value_linear=0.4 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) rospy.loginfo("go left") msg_moving.moving_type=2 msg_moving.moving_value_angular=90 msg_moving.moving_value_linear=0.0 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) rospy.loginfo("go straight") msg_moving.moving_type=4 msg_moving.moving_value_angular=0.0 msg_moving.moving_value_linear=0.4 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) rospy.loginfo("go left") msg_moving.moving_type=2 msg_moving.moving_value_angular=80 msg_moving.moving_value_linear=0.0 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) rospy.loginfo("go straight") msg_moving.moving_type=4 msg_moving.moving_value_angular=0.0 msg_moving.moving_value_linear=0.4 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.loginfo("construction finished") msg_pub_construction_return.data = self.StepOfConstruction.exit.value elif order.data == self.StepOfConstruction.exit.value: rospy.loginfo("construction finished") msg_pub_construction_return.data = self.StepOfConstruction.exit.value self.pub_construction_return.publish(msg_pub_construction_return) rospy.sleep(3) def cbScanObstacle(self, scan): angle_scan = 25 scan_start = 0 - angle_scan scan_end = 0 + angle_scan threshold_distance = 0.2 is_obstacle_detected = False for i in range(scan_start, scan_end): if scan.ranges[i] < threshold_distance and scan.ranges[i] > 0.01: is_obstacle_detected = True self.is_obstacle_detected = is_obstacle_detected def main(self): rospy.spin() if __name__ == '__main__': rospy.init_node('detect_contruction') node = DetectContruction() node.main() Footer
<< 上一篇
下一篇 >>