Source code for ROS_Stack.jetracer_speedway_drivers.src.jetracer_driver
#!/usr/bin/env python3
#
# This file is part of the repo: https://github.com/marqinhos/JetRacer_Autonomous_Driving
# If you find the code useful, please cite the Author: Marcos Fernandez Gonzalez
#
# Copyright 2023 The JetRacer Autonomous Driving Author. All Rights Reserved.
#
# Licensed under the AGPL-3.0 License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.gnu.org/licenses/agpl-3.0.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========================================================================================
from utils import JetRacer
import rospy
from jetracer_speedway_msgs.msg import Velocities
from std_msgs.msg import Bool
import threading as th
import signal
import yaml
[docs]
class Driver(th.Thread):
def __init__(self, jetracer: JetRacer) -> "Driver":
########################### Threads ###########################
# Initialize thread
th.Thread.__init__(self)
########################### YAML ###########################
# Load config.yaml
yaml_path = rospy.get_param('config_file')
with open(yaml_path, 'r') as f:
config = yaml.safe_load(f)
########################### JetRacer ###########################
self.jetracer = jetracer
self.vel = 0
self.angle = 0
########################### Running ###########################
self.running = True
self.emergency_stop = False
########################### ROS ###########################
## Constants
self.name_sub_vels = config["control"]["pub_name"]
self.name_sub_emergency = config["navigation"]["pub_name_obs"]
self.name_ros_node = config["driver"]["node_name"]
## Initialize node of ros
rospy.init_node(self.name_ros_node)
## Rate
self.rate = config["rate"]
self.rospyRate = rospy.Rate(self.rate)
## Publisher and Subscribers
rospy.wait_for_message(self.name_sub_vels, Velocities)
self.vels_sub = rospy.Subscriber(self.name_sub_vels, Velocities, self.__callback_vels)
rospy.wait_for_message(self.name_sub_emergency, Bool)
self.emer_sub = rospy.Subscriber(self.name_sub_emergency, Bool, self.__callback_emergency)
[docs]
def run(self) -> None:
"""Void to set velocities into JetRacer
"""
rospy.loginfo("Setting velocities to JetRacer...")
while self.running:
if not self.emergency_stop:
self.jetracer.set_vel(self.vel)
self.jetracer.set_angle(self.angle)
self.rospyRate.sleep()
else:
self.jetracer.set_vel(0)
self.jetracer.set_angle(0)
self.rospyRate.sleep()
self.jetracer.set_vel(0)
self.jetracer.set_angle(0)
def __callback_vels(self, msg: Velocities) -> None:
"""Callback to get the linear and angular vels, to set in JetRacer
Args:
msg (Twist): Message to get to subscribe to the jetracer_vels topic
"""
self.vel = msg.linear
self.angle = msg.angular
def __callback_emergency(self, msg: Bool) -> None:
"""Callback to set emergency stop to JetRacer
Args:
msg (Bool): Message to get to subscribe to the jetracer_obstacle_detected
"""
"""
if msg.data is True:
rospy.loginfo("EMERGENCY STOP")
self.emergency_stop = True
## self.jetracer.stop_emergency()
elif msg.data is False:
if self.emergency_stop is True:
self.emergency_stop = False
## self.jetracer.run_stop_emergency = False
else:
pass
"""
self.emergency_stop = msg.data
[docs]
def signal_handler(signal, frame):
rospy.logwarn("Ctrl+C detected, stopping threads...")
driver_control.jetracer.set_vel(0)
driver_control.jetracer.set_angle(0)
driver_control.running = False
driver_control.jetracer.stop()
if __name__ == '__main__':
try:
## Call JetRacer and Brain classes
jetracer = JetRacer()
driver_control = Driver(jetracer)
## Manage SIGINT signal
signal.signal(signal.SIGINT, signal_handler)
## Set rate to JetRacer
jetracer.set_rate(driver_control.rospyRate)
## Upload threads
list_threads = [jetracer, driver_control]
## Start and Join threads
[wire.start() for wire in list_threads]
[wire.join() for wire in list_threads]
except rospy.ROSInterruptException:
rospy.loginfo("Close this program")
pass