wyzely-detect/wyzely_detect/__main__.py

148 lines
5.7 KiB
Python

# import face_recognition
from pathlib import Path
import cv2
from prettytable import PrettyTable
# import hjson as json
import torch
from ultralytics import YOLO
from .utils import utils
from .utils.cli_args import argparser
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
args = None
def main():
global objects_and_peoples
global args
args = argparser.parse_args()
# Check if a CUDA GPU is available. If it is, set it via torch. If not, set it to cpu
# https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168
# Currently, I have been unable to set up Poetry to use GPU for Torch
for i in range(torch.cuda.device_count()):
print(f'Using {torch.cuda.get_device_properties(i).name} for pytorch')
if torch.cuda.is_available():
torch.cuda.set_device(0)
print("Set CUDA device")
else:
print("No CUDA device available, using CPU")
# Seems automatically, deepface (tensorflow) tried to use my GPU on Pop!_OS (I did not set up cudnn or anything)
# Not sure the best way, in Poetry, to manage GPU libraries so for now, just use CPU
if args.force_disable_tensorflow_gpu:
print("Forcing tensorflow to use CPU")
import tensorflow as tf
tf.config.set_visible_devices([], 'GPU')
if tf.config.experimental.list_logical_devices('GPU'):
print('GPU disabled unsuccessfully')
else:
print("GPU disabled successfully")
model = YOLO("yolov8n.pt")
# Depending on if the user wants to use a stream or a capture device,
# Set the video capture to the appropriate source
if not args.rtsp_url and not args.capture_device:
print("No stream or capture device set, defaulting to capture device 0")
video_sources = {
"devices": [cv2.VideoCapture(0)]
}
else:
video_sources = {
"streams": [cv2.VideoCapture(url) for url in args.rtsp_url],
"devices": [cv2.VideoCapture(device) for device in args.capture_device],
}
if args.fake_second_source:
try:
video_sources["devices"].append(video_sources["devices"][0])
except KeyError:
print("No capture device to use as second source. Trying stream.")
try:
video_sources["devices"].append(video_sources["devices"][0])
except KeyError:
print("No stream to use as a second source")
# When the code tries to resize the nonexistent capture device 1, the program will fail
# Eliminate lag by setting the buffer size to 1
# This makes it so that the video capture will only grab the most recent frame
# However, this means that the video may be choppy
# Only do this for streams
try:
for stream in video_sources["streams"]:
stream.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# If there are no streams, this will throw a KeyError
except KeyError:
pass
# Print out the resolution of the video sources. Ideally, change this so the device ID/url is also printed
pretty_table = PrettyTable(field_names=["Source Type", "Resolution"])
for source_type, sources in video_sources.items():
for source in sources:
if source.get(cv2.CAP_PROP_FRAME_WIDTH) == 0 or source.get(cv2.CAP_PROP_FRAME_HEIGHT) == 0:
message = "Capture for a source failed as resolution is 0x0.\n"
if source_type == "streams":
message += "Check if the stream URL is correct and if the stream is online."
else:
message += "Check if the capture device is connected, working, and not in use by another program."
print(message)
# Maybe use os.exit() instead?
exit(1)
pretty_table.add_row(
[source_type, f"{source.get(cv2.CAP_PROP_FRAME_WIDTH)}x{source.get(cv2.CAP_PROP_FRAME_HEIGHT)}"]
)
print(pretty_table)
print
print("Beginning video capture...")
while True:
# Grab a single frame of video
frames = []
# frames = [source.read() for sources in video_sources.values() for source in sources]
for list_of_sources in video_sources.values():
frames.extend([source.read()[1] for source in list_of_sources])
frames_to_show = []
for frame in frames:
frames_to_show.append(utils.process_footage(
frame = frame,
run_scale = args.run_scale,
view_scale = args.view_scale,
faces_directory=Path(args.faces_directory),
face_confidence_threshold=args.face_confidence_threshold,
no_remove_representations=args.no_remove_representations,
detection_window=args.detection_window,
detection_duration=args.detection_duration,
notification_window=args.notification_window,
ntfy_url=args.ntfy_url,
model=model,
detect_object=args.detect_object,
object_confidence_threshold=args.object_confidence_threshold,
))
# Display the resulting frame
# TODO: When multi-camera support is added, this needs to be changed to allow all feeds
if not args.no_display:
for i, frame_to_show in enumerate(frames_to_show):
cv2.imshow(f"Video {i}", frame_to_show)
# Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord("q"):
break
# Release handle to the webcam
print("Releasing video capture")
[source.release() for sources in video_sources.values() for source in sources]
cv2.destroyAllWindows()
if __name__ == "__main__":
main()