Source code for ROS_Stack.jetracer_speedway_sensors.src.camera_publish

#!/usr/bin/env python3

#
# This file is part of the repo: https://github.com/marqinhos/JetRacer_Autonomous_Driving
# If you find the code useful, please cite the Author: Marcos Fernandez Gonzalez
# 
# Copyright 2023 The JetRacer Autonomous Driving Author. All Rights Reserved.
#
# Licensed under the AGPL-3.0 License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.gnu.org/licenses/agpl-3.0.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========================================================================================


import rospy
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np
import depthai as dai
import time
import os 
import threading as th
import sys
import yaml


[docs] class Camera_OAK(th.Thread): def __init__(self) -> "Camera_OAK": ########################### Threads ########################### # Initialize thread th.Thread.__init__(self) ########################### IMAGE ########################### self.width = 640 self.height = 480 self.frame = np.zeros((self.width, self.height)) ########################### OS ########################### self.running = True ########################### CAMERA ########################### self.device = None @staticmethod def __show_specs(device) -> None: print('Device name:', device.getDeviceName()) # Bootloader version if device.getBootloaderVersion() is not None: print('Bootloader version:', device.getBootloaderVersion()) # Print out usb speed print('Usb speed:', device.getUsbSpeed().name) # Connected cameras print('Connected cameras:', device.getConnectedCameraFeatures())
[docs] def run(self) -> None: try: with dai.Device() as device: self.device = device self.__show_specs(device) # Create pipeline pipeline = dai.Pipeline() cams = device.getConnectedCameraFeatures() streams = [] for cam in cams: """Initialize camera """ print(str(cam), str(cam.socket), cam.socket) c = pipeline.create(dai.node.Camera) x = pipeline.create(dai.node.XLinkOut) c.isp.link(x.input) c.setBoardSocket(cam.socket) stream = str(cam.socket) if cam.name: stream = f'{cam.name} ({stream})' x.setStreamName(stream) streams.append(stream) # Start pipeline device.startPipeline(pipeline) fpsCounter = {} while not device.isClosed() and self.running: if self.running is False: raise Exception queueNames = device.getQueueEvents(streams) for stream in queueNames: messages = device.getOutputQueue(stream).tryGetAll() fpsCounter[stream] = fpsCounter.get(stream, 0.0) + len(messages) for message in messages: # Display arrived frames if type(message) == dai.ImgFrame: frame = message.getCvFrame() resize_points = (self.width, self.height) frame = cv2.resize(frame, resize_points, interpolation= cv2.INTER_LINEAR) self.frame = frame except Exception as e: rospy.loginfo("Close Camera") device.close() finally: rospy.loginfo("Close Camera") device.close()
[docs] def get_frame(self) -> np.ndarray: """Function to get the image save in the buffer Returns: np.ndarray: Return the last image save in the buffer """ return self.frame
[docs] class ImagePublisher(th.Thread): """Class to publish in a topic cam image""" def __init__(self, camera: Camera_OAK) -> "ImagePublisher": ########################### Threads ########################### # Initialize thread th.Thread.__init__(self) ########################### YAML ########################### # Load config.yaml yaml_path = rospy.get_param('config_file') with open(yaml_path, 'r') as f: config = yaml.safe_load(f) ########################### ROS ########################### ## Constants self.name_pub = config["sensors"]["pub_name_img"] self.name_ros_node = config["sensors"]["node_name_img_oak"] ## Initialize node of ros rospy.init_node(self.name_ros_node) self.pub_image = rospy.Publisher(self.name_pub, Image, queue_size=1) ## Rate self.rate = config["rate"] self.rospyRate = rospy.Rate(self.rate) ########################### IMAGE ########################### self.bridge = CvBridge() self.camera = camera ########################### OS ########################### self.running = True
[docs] def run(self) -> None: rospy.loginfo("Publishing Image...") while not rospy.is_shutdown() and self.running: try: self.__publish_image() except: rospy.logwarn("Image is not Published") self.rospyRate.sleep() ## Close camera self.camera.running = False rospy.loginfo("Shutdown")
def __publish_image(self) -> None: """Void to publish image """ image = self.camera.get_frame() ## Check image if image is not None: ## Convert opencv image to image msg to publish ros_image = self.bridge.cv2_to_imgmsg(image, 'bgr8') self.pub_image.publish(ros_image)
[docs] def signal_handler(signal, frame) -> None: print("Ctrl+C detected, stopping threads...") image_publisher.running = False sys.exit(0)
if __name__ == '__main__': try: camera = Camera_OAK() image_publisher = ImagePublisher(camera) list_run = [camera, image_publisher] [item.start() for item in list_run] [item.join() for item in list_run] except rospy.ROSInterruptException: pass