Source code for ROS_Stack.jetracer_speedway_navigation.src.publish_goalPoint

#!/usr/bin/env python3

#
# This file is part of the repo: https://github.com/marqinhos/JetRacer_Autonomous_Driving
# If you find the code useful, please cite the Author: Marcos Fernandez Gonzalez
# 
# Copyright 2023 The JetRacer Autonomous Driving Author. All Rights Reserved.
#
# Licensed under the AGPL-3.0 License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.gnu.org/licenses/agpl-3.0.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========================================================================================


import rospy
import cv2
import numpy as np
import os 
import threading as th
import signal
import yaml

# Import YOLO
from ultralytics import YOLO

# Import type of msgs
from sensor_msgs.msg import Image
from geometry_msgs.msg import Twist
from jetracer_speedway_msgs.msg import Points
# Import converter ros image to cv2 image
from cv_bridge import CvBridge

# Import utils
from utils import Point, Features_Detection
# Import detections features
# from .utils.image_process import Features_Detection



[docs] class ProcessImage(th.Thread): """Class to subscribe a topic to take image and by means of IA extract the lane and lines segmentation. Them process the segmentation, to extract the desired point to go the robot """ def __init__(self, model_ia_path: str) -> "ProcessImage": ########################### Threads ########################### # Initialize thread th.Thread.__init__(self) ########################### YAML ########################### # Load config.yaml yaml_path = rospy.get_param('config_file') with open(yaml_path, 'r') as f: config = yaml.safe_load(f) ########################### Running ########################### self.running = True ########################### IA ########################### self.model_ia_path = model_ia_path self.model_ia = YOLO(self.model_ia_path) self.predict_ia_conf = config["confidence"] ########################### IMAGE ########################### self.frame = None self.bridge = CvBridge() self.last_message_seq = None #################### FEATURES DETECTION #################### self.compute_detect = Features_Detection() self.last_desired_pt = Point(0, 0) ########################### ROS ########################### ## Constants self.name_sub = config["sensors"]["pub_name_img"] self.name_pub = config["navigation"]["pub_name_img"] self.name_ros_node = config["navigation"]["node_name_img"] ## Initialize node of ros rospy.init_node(self.name_ros_node) ## Rate self.rate = config["rate"] self.rospyRate = rospy.Rate(self.rate) ## Publisher and Subscribers self.pub_vels = rospy.Publisher(self.name_pub, Points, queue_size=1) rospy.wait_for_message(self.name_sub, Image) self.sub_img = rospy.Subscriber(self.name_sub, Image, self.__callback_image)
[docs] def run(self) -> None: """Main void """ while self.running: ## Wait for a frame if self.frame is None: self.rospyRate.sleep() continue ## Execute ia to extract features result = self.__run_ia(self.frame) ## Get desired Point try: #Cuando la IA detecta algo, esta funcion devuelve None desired_pt = self.compute_detect.run(result) if not self.last_desired_pt.zero(): if abs(self.last_desired_pt.x - desired_pt.x) > 95 or abs(self.last_desired_pt.y - desired_pt.y) > 80: rospy.logwarn("BAD DESIRED POINT") desired_pt = self.last_desired_pt.middle_2_point(desired_pt) self.last_desired_pt = desired_pt except: desired_pt = self.last_desired_pt ## Show Point in image ##self.__show(self.frame, desired_pt) ## Show segmentation using Yolov8 method self.__show_segmentation(result) ## Publish desired point self.publish_point(desired_pt) # rospy.loginfo(f"Point: {desired_pt}") self.rospyRate.sleep() rospy.loginfo(f"Shutdown {self.name_ros_node}")
[docs] def publish_point(self, desired_point: Point) -> None: """Void to publish desired point in Points format. The topic to publish is "jetracer_vels" Args: angle_deg (float): Angle """ real_pose = Point(self.compute_detect.size[0]//2, self.compute_detect.size[1]-10) # Create Points message point_msg = Points() point_msg.x = desired_point.x point_msg.y = desired_point.y point_msg.xr = real_pose.x point_msg.yr = real_pose.y # Publish message in topic vels_jetracer self.pub_vels.publish(point_msg)
def __show(self, image: np.ndarray, point: Point) -> None: """Void to show desired point, and real point Args: image (np.ndarray): Image to show point (Point): Point to draw in image """ cv2.circle(image, (point.x, point.y), 5, (0, 0, 255), -1) cv2.circle(image, (self.compute_detect.size[0]//2, self.compute_detect.size[1]-10), 5, (255, 0, 0), -1) cv2.imshow(self.name_ros_node , image) cv2.waitKey(3) def __show_segmentation(self, results: list) -> None: """Void to show detections using new YOLOv8 method. Args: results (list): Dictionary with all objects detections """ for result in results: ## Create image with bounding boxes annotated_frame = result.plot() ## Show cv2.imshow("Segementation of detected areas", annotated_frame) cv2.waitKey(3) def __callback_image(self, data: Image) -> None: """Callback to set in self variable the values of image. Convert the type of data to opencv image Args: data (Image): Image in format image msg """ current_time = rospy.Time.now() msg_time = data.header.stamp time_diff = current_time - msg_time rospy.loginfo(f"Delay de la red: {time_diff.to_sec()} segundos") if self.last_message_seq is not None: lost_msgs = data.header.seq - self.last_message_seq -1 if lost_msgs >0: rospy.loginfo(f"Se han perdido {lost_msgs} paquetes") self.last_message_seq = data.header.seq image = self.bridge.imgmsg_to_cv2(data, "bgr8") self.frame = image def __run_ia(self, frame: np.ndarray) -> list: """Function to segment a frame with own IA Args: frame (np.ndarray): Frame to predict Returns: list: List of result (masks, boxes, image) that return prediction in model YOLOv8 """ return self.model_ia.predict(source=frame, conf=self.predict_ia_conf, device='cpu')
[docs] def signal_handler(signal, frame) -> None: rospy.logwarn("Ctrl+C detected, stopping goal point thread...") image_process.running = False
if __name__ == '__main__': try: ## Absolute path file_path = os.path.abspath(__file__) ## Absolute path of folder models models_dir = os.path.join(os.path.dirname(file_path), 'models') ## Model IA path model_path = os.path.join(models_dir, 'model_16.pt') ## Call Process Image class image_process = ProcessImage(model_ia_path=model_path) ## Manage SIGINT signal signal.signal(signal.SIGINT, signal_handler) ## Upload thread list_threads = [image_process] ## Start and Join threads [wire.start() for wire in list_threads] [wire.join() for wire in list_threads] except rospy.ROSInterruptException: rospy.loginfo("Close this program") pass