TB3-源码剖析-3.车道线检测功能

import rospy    
import numpy as np    
import cv2    
from cv_bridge import CvBridge    
from std_msgs.msg import UInt8, Float64    
from sensor_msgs.msg import Image, CompressedImage    
from dynamic_reconfigure.server import Server    
from turtlebot3_autorace_detect.cfg import DetectLaneParamsConfig    
class DetectLane():    
def __init__(self):    
self.hue_white_l = rospy.get_param("~detect/lane/white/hue_l", 0)    
self.hue_white_h = rospy.get_param("~detect/lane/white/hue_h", 179)    
self.saturation_white_l = rospy.get_param("~detect/lane/white/saturation_l", 0)    
self.saturation_white_h = rospy.get_param("~detect/lane/white/saturation_h", 70)    
self.lightness_white_l = rospy.get_param("~detect/lane/white/lightness_l", 105)    
self.lightness_white_h = rospy.get_param("~detect/lane/white/lightness_h", 255)    
self.hue_yellow_l = rospy.get_param("~detect/lane/yellow/hue_l", 10)    
self.hue_yellow_h = rospy.get_param("~detect/lane/yellow/hue_h", 127)    
self.saturation_yellow_l = rospy.get_param("~detect/lane/yellow/saturation_l", 70)    
self.saturation_yellow_h = rospy.get_param("~detect/lane/yellow/saturation_h", 255)    
self.lightness_yellow_l = rospy.get_param("~detect/lane/yellow/lightness_l", 95)    
self.lightness_yellow_h = rospy.get_param("~detect/lane/yellow/lightness_h", 255)    
self.is_calibration_mode = rospy.get_param("~is_detection_calibration_mode", False)    
if self.is_calibration_mode == True:    
srv_detect_lane = Server(DetectLaneParamsConfig, self.cbGetDetectLaneParam)    
self.sub_image_type = "raw"         # you can choose image type "compressed", "raw"    
self.pub_image_type = "compressed"  # you can choose image type "compressed", "raw"    
if self.sub_image_type == "compressed":    
# subscribes compressed image    
self.sub_image_original = rospy.Subscriber('/detect/image_input/compressed', CompressedImage, self.cbFindLane, queue_size = 1)    
elif self.sub_image_type == "raw":    
# subscribes raw image    
self.sub_image_original = rospy.Subscriber('/detect/image_input', Image, self.cbFindLane, queue_size = 1)    
if self.pub_image_type == "compressed":    
# publishes lane image in compressed type     
self.pub_image_lane = rospy.Publisher('/detect/image_output/compressed', CompressedImage, queue_size = 1)    
elif self.pub_image_type == "raw":    
# publishes lane image in raw type    
self.pub_image_lane = rospy.Publisher('/detect/image_output', Image, queue_size = 1)    
if self.is_calibration_mode == True:    
if self.pub_image_type == "compressed":    
# publishes lane image in compressed type     
self.pub_image_white_lane = rospy.Publisher('/detect/image_output_sub1/compressed', CompressedImage, queue_size = 1)    
self.pub_image_yellow_lane = rospy.Publisher('/detect/image_output_sub2/compressed', CompressedImage, queue_size = 1)    
elif self.pub_image_type == "raw":    
# publishes lane image in raw type    
self.pub_image_white_lane = rospy.Publisher('/detect/image_output_sub1', Image, queue_size = 1)    
self.pub_image_yellow_lane = rospy.Publisher('/detect/image_output_sub2', Image, queue_size = 1)    
self.pub_lane = rospy.Publisher('/detect/lane', Float64, queue_size = 1)    
# subscribes state : yellow line reliability    
self.pub_yellow_line_reliability = rospy.Publisher('/detect/yellow_line_reliability', UInt8, queue_size=1)    
# subscribes state : white line reliability    
self.pub_white_line_reliability = rospy.Publisher('/detect/white_line_reliability', UInt8, queue_size=1)    
self.cvBridge = CvBridge()    
self.counter = 1    
self.window_width = 1000.    
self.window_height = 600.    
self.reliability_white_line = 100    
self.reliability_yellow_line = 100    
def cbGetDetectLaneParam(self, config, level):    
rospy.loginfo("[Detect Lane] Detect Lane Calibration Parameter reconfigured to")    
rospy.loginfo("hue_white_l : %d", config.hue_white_l)    
rospy.loginfo("hue_white_h : %d", config.hue_white_h)    
rospy.loginfo("saturation_white_l : %d", config.saturation_white_l)    
rospy.loginfo("saturation_white_h : %d", config.saturation_white_h)    
rospy.loginfo("lightness_white_l : %d", config.lightness_white_l)    
rospy.loginfo("lightness_white_h : %d", config.lightness_white_h)    
rospy.loginfo("hue_yellow_l : %d", config.hue_yellow_l)    
rospy.loginfo("hue_yellow_h : %d", config.hue_yellow_h)    
rospy.loginfo("saturation_yellow_l : %d", config.saturation_yellow_l)    
rospy.loginfo("saturation_yellow_h : %d", config.saturation_yellow_h)    
rospy.loginfo("lightness_yellow_l : %d", config.lightness_yellow_l)    
rospy.loginfo("lightness_yellow_h : %d", config.lightness_yellow_h)    
self.hue_white_l = config.hue_white_l    
self.hue_white_h = config.hue_white_h    
self.saturation_white_l = config.saturation_white_l    
self.saturation_white_h = config.saturation_white_h    
self.lightness_white_l = config.lightness_white_l    
self.lightness_white_h = config.lightness_white_h    
self.hue_yellow_l = config.hue_yellow_l    
self.hue_yellow_h = config.hue_yellow_h    
self.saturation_yellow_l = config.saturation_yellow_l    
self.saturation_yellow_h = config.saturation_yellow_h    
self.lightness_yellow_l = config.lightness_yellow_l    
self.lightness_yellow_h = config.lightness_yellow_h    
return config    
def cbFindLane(self, image_msg):    
# Change the frame rate by yourself. Now, it is set to 1/3 (10fps).     
# Unappropriate value of frame rate may cause huge delay on entire recognition process.    
# This is up to your computer's operating power.    
if self.counter % 3 != 0:    
self.counter += 1    
return    
else:    
self.counter = 1    
if self.sub_image_type == "compressed":    
#converting compressed image to opencv image    
np_arr = np.frombuffer(image_msg.data, np.uint8)    
cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)    
elif self.sub_image_type == "raw":    
cv_image = self.cvBridge.imgmsg_to_cv2(image_msg, "bgr8")    
# find White and Yellow Lanes    
white_fraction, cv_white_lane = self.maskWhiteLane(cv_image)    
yellow_fraction, cv_yellow_lane = self.maskYellowLane(cv_image)    
try:    
if yellow_fraction > 3000:    
self.left_fitx, self.left_fit = self.fit_from_lines(self.left_fit, cv_yellow_lane)    
self.mov_avg_left = np.append(self.mov_avg_left,np.array([self.left_fit]), axis=0)    
if white_fraction > 3000:    
self.right_fitx, self.right_fit = self.fit_from_lines(self.right_fit, cv_white_lane)    
self.mov_avg_right = np.append(self.mov_avg_right,np.array([self.right_fit]), axis=0)    
except:    
if yellow_fraction > 3000:    
self.left_fitx, self.left_fit = self.sliding_windown(cv_yellow_lane, 'left')    
self.mov_avg_left = np.array([self.left_fit])    
if white_fraction > 3000:    
self.right_fitx, self.right_fit = self.sliding_windown(cv_white_lane, 'right')    
self.mov_avg_right = np.array([self.right_fit])    
MOV_AVG_LENGTH = 5    
self.left_fit = np.array([np.mean(self.mov_avg_left[::-1][:, 0][0:MOV_AVG_LENGTH]),    
np.mean(self.mov_avg_left[::-1][:, 1][0:MOV_AVG_LENGTH]),    
np.mean(self.mov_avg_left[::-1][:, 2][0:MOV_AVG_LENGTH])])    
self.right_fit = np.array([np.mean(self.mov_avg_right[::-1][:, 0][0:MOV_AVG_LENGTH]),    
np.mean(self.mov_avg_right[::-1][:, 1][0:MOV_AVG_LENGTH]),    
np.mean(self.mov_avg_right[::-1][:, 2][0:MOV_AVG_LENGTH])])    
if self.mov_avg_left.shape[0] > 1000:    
self.mov_avg_left = self.mov_avg_left[0:MOV_AVG_LENGTH]    
if self.mov_avg_right.shape[0] > 1000:    
self.mov_avg_right = self.mov_avg_right[0:MOV_AVG_LENGTH]    
self.make_lane(cv_image, white_fraction, yellow_fraction)    
def maskWhiteLane(self, image):    
# Convert BGR to HSV    
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)    
Hue_l = self.hue_white_l    
Hue_h = self.hue_white_h    
Saturation_l = self.saturation_white_l    
Saturation_h = self.saturation_white_h    
Lightness_l = self.lightness_white_l    
Lightness_h = self.lightness_white_h    
# define range of white color in HSV    
lower_white = np.array([Hue_l, Saturation_l, Lightness_l])    
upper_white = np.array([Hue_h, Saturation_h, Lightness_h])    
# Threshold the HSV image to get only white colors    
mask = cv2.inRange(hsv, lower_white, upper_white)    
# Bitwise-AND mask and original image    
res = cv2.bitwise_and(image, image, mask = mask)    
fraction_num = np.count_nonzero(mask)    
if self.is_calibration_mode == False:    
if fraction_num > 35000:    
if self.lightness_white_l < 250:    
self.lightness_white_l += 5    
elif fraction_num < 5000:    
if self.lightness_white_l > 50:    
self.lightness_white_l -= 5    
how_much_short = 0    
for i in range(0, 600):    
if np.count_nonzero(mask[i,::]) > 0:    
how_much_short += 1    
how_much_short = 600 - how_much_short    
if how_much_short > 100:    
if self.reliability_white_line >= 5:    
self.reliability_white_line -= 5    
elif how_much_short <= 100:    
if self.reliability_white_line <= 99:    
self.reliability_white_line += 5    
msg_white_line_reliability = UInt8()    
msg_white_line_reliability.data = self.reliability_white_line    
self.pub_white_line_reliability.publish(msg_white_line_reliability)    
if self.is_calibration_mode == True:    
if self.pub_image_type == "compressed":    
# publishes white lane filtered image in compressed type    
self.pub_image_white_lane.publish(self.cvBridge.cv2_to_compressed_imgmsg(mask, "jpg"))    
elif self.pub_image_type == "raw":    
# publishes white lane filtered image in raw type    
self.pub_image_white_lane.publish(self.cvBridge.cv2_to_imgmsg(mask, "bgr8"))    
return fraction_num, mask    
def maskYellowLane(self, image):    
# Convert BGR to HSV    
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)    
Hue_l = self.hue_yellow_l    
Hue_h = self.hue_yellow_h    
Saturation_l = self.saturation_yellow_l    
Saturation_h = self.saturation_yellow_h    
Lightness_l = self.lightness_yellow_l    
Lightness_h = self.lightness_yellow_h    
# define range of yellow color in HSV    
lower_yellow = np.array([Hue_l, Saturation_l, Lightness_l])    
upper_yellow = np.array([Hue_h, Saturation_h, Lightness_h])    
# Threshold the HSV image to get only yellow colors    
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)    
# Bitwise-AND mask and original image    
res = cv2.bitwise_and(image, image, mask = mask)    
fraction_num = np.count_nonzero(mask)    
if self.is_calibration_mode == False:    
if fraction_num > 35000:    
if self.lightness_yellow_l < 250:    
self.lightness_yellow_l += 20    
elif fraction_num < 5000:    
if self.lightness_yellow_l > 90:    
self.lightness_yellow_l -= 20    
how_much_short = 0    
for i in range(0, 600):    
if np.count_nonzero(mask[i,::]) > 0:    
how_much_short += 1    
how_much_short = 600 - how_much_short    
if how_much_short > 100:    
if self.reliability_yellow_line >= 5:    
self.reliability_yellow_line -= 5    
elif how_much_short <= 100:    
if self.reliability_yellow_line <= 99:    
self.reliability_yellow_line += 5    
msg_yellow_line_reliability = UInt8()    
msg_yellow_line_reliability.data = self.reliability_yellow_line    
self.pub_yellow_line_reliability.publish(msg_yellow_line_reliability)    
if self.is_calibration_mode == True:    
if self.pub_image_type == "compressed":    
# publishes yellow lane filtered image in compressed type    
self.pub_image_yellow_lane.publish(self.cvBridge.cv2_to_compressed_imgmsg(mask, "jpg"))    
elif self.pub_image_type == "raw":    
# publishes yellow lane filtered image in raw type    
self.pub_image_yellow_lane.publish(self.cvBridge.cv2_to_imgmsg(mask, "bgr8"))    
return fraction_num, mask    
def fit_from_lines(self, lane_fit, image):    
nonzero = image.nonzero()    
nonzeroy = np.array(nonzero[0])    
nonzerox = np.array(nonzero[1])    
margin = 100    
lane_inds = ((nonzerox > (lane_fit[0] * (nonzeroy ** 2) + lane_fit[1] * nonzeroy + lane_fit[2] - margin)) & (    
nonzerox < (lane_fit[0] * (nonzeroy ** 2) + lane_fit[1] * nonzeroy + lane_fit[2] + margin)))    
# Again, extract line pixel positions    
x = nonzerox[lane_inds]    
y = nonzeroy[lane_inds]    
# Fit a second order polynomial to each    
lane_fit = np.polyfit(y, x, 2)    
# Generate x and y values for plotting    
ploty = np.linspace(0, image.shape[0] - 1, image.shape[0])    
lane_fitx = lane_fit[0] * ploty ** 2 + lane_fit[1] * ploty + lane_fit[2]    
return lane_fitx, lane_fit    
def sliding_windown(self, img_w, left_or_right):    
histogram = np.sum(img_w[int(img_w.shape[0] / 2):, :], axis=0)    
# Create an output image to draw on and visualize the result    
out_img = np.dstack((img_w, img_w, img_w)) * 255    
# Find the peak of the left and right halves of the histogram    
# These will be the starting point for the left and right lines    
midpoint = np.int(histogram.shape[0] / 2)    
if left_or_right == 'left':    
lane_base = np.argmax(histogram[:midpoint])    
elif left_or_right == 'right':    
lane_base = np.argmax(histogram[midpoint:]) + midpoint    
# Choose the number of sliding windows    
nwindows = 20    
# Set height of windows    
window_height = np.int(img_w.shape[0] / nwindows)    
# Identify the x and y positions of all nonzero pixels in the image    
nonzero = img_w.nonzero()    
nonzeroy = np.array(nonzero[0])    
nonzerox = np.array(nonzero[1])    
# Current positions to be updated for each window    
x_current = lane_base    
# Set the width of the windows +/- margin    
margin = 50    
# Set minimum number of pixels found to recenter window    
minpix = 50    
# Create empty lists to receive lane pixel indices    
lane_inds = []    
# Step through the windows one by one    
for window in range(nwindows):    
# Identify window boundaries in x and y    
win_y_low = img_w.shape[0] - (window + 1) * window_height    
win_y_high = img_w.shape[0] - window * window_height    
win_x_low = x_current - margin    
win_x_high = x_current + margin    
# Draw the windows on the visualization image    
cv2.rectangle(out_img, (win_x_low, win_y_low), (win_x_high, win_y_high), (0, 255, 0), 2)    
# Identify the nonzero pixels in x and y within the window    
good_lane_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & (nonzerox >= win_x_low) & (    
nonzerox < win_x_high)).nonzero()[0]    
# Append these indices to the lists    
lane_inds.append(good_lane_inds)    
# If you found > minpix pixels, recenter next window on their mean position    
if len(good_lane_inds) > minpix:    
x_current = np.int(np.mean(nonzerox[good_lane_inds]))    
# Concatenate the arrays of indices    
lane_inds = np.concatenate(lane_inds)    
# Extract line pixel positions    
x = nonzerox[lane_inds]    
y = nonzeroy[lane_inds]    
# Fit a second order polynomial to each    
try:    
lane_fit = np.polyfit(y, x, 2)    
self.lane_fit_bef = lane_fit    
except:    
lane_fit = self.lane_fit_bef    
# Generate x and y values for plotting    
ploty = np.linspace(0, img_w.shape[0] - 1, img_w.shape[0])    
lane_fitx = lane_fit[0] * ploty ** 2 + lane_fit[1] * ploty + lane_fit[2]    
return lane_fitx, lane_fit    
def make_lane(self, cv_image, white_fraction, yellow_fraction):    
# Create an image to draw the lines on    
warp_zero = np.zeros((cv_image.shape[0], cv_image.shape[1], 1), dtype=np.uint8)    
color_warp = np.dstack((warp_zero, warp_zero, warp_zero))    
color_warp_lines = np.dstack((warp_zero, warp_zero, warp_zero))    
ploty = np.linspace(0, cv_image.shape[0] - 1, cv_image.shape[0])    
if yellow_fraction > 3000:    
pts_left = np.array([np.flipud(np.transpose(np.vstack([self.left_fitx, ploty])))])    
cv2.polylines(color_warp_lines, np.int_([pts_left]), isClosed=False, color=(0, 0, 255), thickness=25)    
if white_fraction > 3000:    
pts_right = np.array([np.transpose(np.vstack([self.right_fitx, ploty]))])    
cv2.polylines(color_warp_lines, np.int_([pts_right]), isClosed=False, color=(255, 255, 0), thickness=25)    
self.is_center_x_exist = True    
if self.reliability_white_line > 50 and self.reliability_yellow_line > 50:    
if white_fraction > 3000 and yellow_fraction > 3000:    
centerx = np.mean([self.left_fitx, self.right_fitx], axis=0)    
pts = np.hstack((pts_left, pts_right))    
pts_center = np.array([np.transpose(np.vstack([centerx, ploty]))])    
cv2.polylines(color_warp_lines, np.int_([pts_center]), isClosed=False, color=(0, 255, 255), thickness=12)    
# Draw the lane onto the warped blank image    
cv2.fillPoly(color_warp, np.int_([pts]), (0, 255, 0))    
if white_fraction > 3000 and yellow_fraction <= 3000:    
centerx = np.subtract(self.right_fitx, 320)    
pts_center = np.array([np.transpose(np.vstack([centerx, ploty]))])    
cv2.polylines(color_warp_lines, np.int_([pts_center]), isClosed=False, color=(0, 255, 255), thickness=12)    
if white_fraction <= 3000 and yellow_fraction > 3000:    
centerx = np.add(self.left_fitx, 320)    
pts_center = np.array([np.transpose(np.vstack([centerx, ploty]))])    
cv2.polylines(color_warp_lines, np.int_([pts_center]), isClosed=False, color=(0, 255, 255), thickness=12)    
elif self.reliability_white_line <= 50 and self.reliability_yellow_line > 50:    
centerx = np.add(self.left_fitx, 320)    
pts_center = np.array([np.transpose(np.vstack([centerx, ploty]))])    
cv2.polylines(color_warp_lines, np.int_([pts_center]), isClosed=False, color=(0, 255, 255), thickness=12)    
elif self.reliability_white_line > 50 and self.reliability_yellow_line <= 50:    
centerx = np.subtract(self.right_fitx, 320)    
pts_center = np.array([np.transpose(np.vstack([centerx, ploty]))])    
cv2.polylines(color_warp_lines, np.int_([pts_center]), isClosed=False, color=(0, 255, 255), thickness=12)    
else:    
self.is_center_x_exist = False    
pass    
# Combine the result with the original image    
final = cv2.addWeighted(cv_image, 1, color_warp, 0.2, 0)    
final = cv2.addWeighted(final, 1, color_warp_lines, 1, 0)    
if self.pub_image_type == "compressed":    
if self.is_center_x_exist == True:    
# publishes lane center    
msg_desired_center = Float64()    
msg_desired_center.data = centerx.item(350)    
self.pub_lane.publish(msg_desired_center)    
self.pub_image_lane.publish(self.cvBridge.cv2_to_compressed_imgmsg(final, "jpg"))    
elif self.pub_image_type == "raw":    
if self.is_center_x_exist == True:    
# publishes lane center    
msg_desired_center = Float64()    
msg_desired_center.data = centerx.item(350)    
self.pub_lane.publish(msg_desired_center)    
self.pub_image_lane.publish(self.cvBridge.cv2_to_imgmsg(final, "bgr8"))    
def main(self):    
rospy.spin()    
if __name__ == '__main__':    
rospy.init_node('detect_lane')    
node = DetectLane()    
node.main()


    关键词:TB3-源码剖析