Source code for ROS_Stack.jetracer_speedway_navigation.src.security_stop
#!/usr/bin/env python3
#
# This file is part of the repo: https://github.com/marqinhos/JetRacer_Autonomous_Driving
# If you find the code useful, please cite the Author: Marcos Fernandez Gonzalez
#
# Copyright 2023 The JetRacer Autonomous Driving Author. All Rights Reserved.
#
# Licensed under the AGPL-3.0 License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.gnu.org/licenses/agpl-3.0.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========================================================================================
import rospy
from sensor_msgs.msg import LaserScan
from std_msgs.msg import Bool
import numpy as np
import yaml
import threading as th
import signal
[docs]
class SecurityStop(th.Thread):
def __init__(self) -> "SecurityStop":
########################### Threads ###########################
# Initialize thread
th.Thread.__init__(self)
########################### YAML ###########################
# Load config.yaml
yaml_path = rospy.get_param('config_file')
with open(yaml_path, 'r') as f:
config = yaml.safe_load(f)
########################### DISTANCES ###########################
## Set distance constants
self.MIN_CENTER = config["navigation"]["min_center"]
self.MIN_BACK = config["navigation"]["min_back"]
self.MIN_SIDES = config["navigation"]["min_sides"]
########################### SCAN ###########################
self.ranges = None
########################### Running ###########################
self.running = True
########################### ROS ###########################
## Constants
self.name_ros_node = config["navigation"]["node_name_obs"]
self.name_sub = config["sensors"]["pub_name_rplidar"]
self.name_pub = config["navigation"]["pub_name_obs"]
## Initialize node of ros
rospy.init_node(self.name_ros_node)
## Rate
self.rate = config["rate"]
self.rospyRate = rospy.Rate(self.rate)
self.obstacle_pub = rospy.Publisher(self.name_pub, Bool, queue_size=1)
rospy.wait_for_message(self.name_sub, LaserScan)
self.scan_sub = rospy.Subscriber(self.name_sub, LaserScan, self.__callback_scan)
[docs]
def run(self) -> None:
while self.running:
if self.ranges is None:
self.rospyRate.sleep()
continue
self.publish_warning(self.ranges)
self.rospyRate.sleep()
[docs]
def publish_warning(self, ranges:list) -> None:
laser_beam_c = ranges[0:80] + ranges[len(ranges)-80:len(ranges)]
laser_beam_l = ranges[80:280]
laser_beam_b = ranges[280:440]
laser_beam_r = ranges[440:640]
# laser_beam_c = [0.1 if laser_beam_c[i] > 99 else laser_beam_c[i] for i in range(len(laser_beam_c))]
# mean_center = sum(laser_beam_c)/len(laser_beam_c)
smallest_10 = sorted(laser_beam_c)[:10]
mean = sum(smallest_10) / len(smallest_10)
if mean <= self.MIN_CENTER:
rospy.loginfo("OBSTACLE IN FRONT")
self.obstacle_pub.publish(True)
else:
self.obstacle_pub.publish(False)
def __callback_scan(self, msg: LaserScan) -> None:
self.ranges = msg.ranges
[docs]
def signal_handler(signal, frame):
rospy.logwarn("Ctrl+C detected, stopping emergency stop thread...")
security.running = False
if __name__ == '__main__':
try:
## Call JetRacer and Brain classes
security = SecurityStop()
## Manage SIGINT signal
signal.signal(signal.SIGINT, signal_handler)
## Upload threads
list_threads = [security]
## Start and Join threads
[wire.start() for wire in list_threads]
[wire.join() for wire in list_threads]
except rospy.ROSInterruptException:
rospy.loginfo("Close this program")
pass