diff --git a/wyzely_detect/__main__.py b/wyzely_detect/__main__.py index 7f6f1bd..d17be86 100644 --- a/wyzely_detect/__main__.py +++ b/wyzely_detect/__main__.py @@ -7,17 +7,12 @@ import cv2 import torch from ultralytics import YOLO -from .utils import notify, utils +from .utils import utils from .utils.cli_args import argparser DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" args = None -objects_and_peoples = { - "objects": {}, - "peoples": {}, -} - def main(): global objects_and_peoples @@ -70,121 +65,32 @@ def main(): while True: # Grab a single frame of video ret, frame = video_capture.read() - # Resize frame of video to a smaller size for faster recognition processing - run_frame = cv2.resize(frame, (0, 0), fx=args.run_scale, fy=args.run_scale) - # view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale) - results = model(run_frame, verbose=False) + frame_to_show = utils.process_footage( + frame = frame, + run_scale = args.run_scale, + view_scale = args.view_scale, - path_to_faces = Path(args.faces_directory) - path_to_faces_exists = path_to_faces.is_dir() + faces_directory=Path(args.faces_directory), + face_confidence_threshold=args.face_confidence_threshold, + no_remove_representations=args.no_remove_representations, - for i, r in enumerate(results): - # list of dicts with each dict containing a label, x1, y1, x2, y2 - plot_boxes = [] + detection_window=args.detection_window, + detection_duration=args.detection_duration, + notification_window=args.notification_window, - # The following is stuff for people - # This is still in the for loop as each result, no matter if anything is detected, will be present. - # Thus, there will always be one result (r) + ntfy_url=args.ntfy_url, - # Only run if path_to_faces exists - # May be better to check every iteration, but this also works - if path_to_faces_exists: - if face_details := utils.recognize_face( - path_to_directory=path_to_faces, - run_frame=run_frame, - min_confidence=args.face_confidence_threshold, - no_remove_representations=args.no_remove_representations, - ): - plot_boxes.append(face_details) - objects_and_peoples = notify.thing_detected( - thing_name=face_details["label"], - objects_and_peoples=objects_and_peoples, - detection_type="peoples", - detection_window=args.detection_window, - detection_duration=args.detection_duration, - notification_window=args.notification_window, - ntfy_url=args.ntfy_url, - ) - - # The following is stuff for objects - # Setup dictionary of object names - if ( - objects_and_peoples["objects"] == {} - or objects_and_peoples["objects"] is None - ): - for name in r.names.values(): - objects_and_peoples["objects"][name] = { - "last_detection_time": None, - "detection_duration": None, - # "first_detection_time": None, - "last_notification_time": None, - } - # Also, make sure that the objects to detect are in the list of objects_and_peoples - # If it isn't, print a warning - for obj in args.detect_object: - if obj not in objects_and_peoples: - print( - f"Warning: {obj} is not in the list of objects the model can detect!" - ) - - for box in r.boxes: - # Get the name of the object - class_id = r.names[box.cls[0].item()] - # Get the coordinates of the object - cords = box.xyxy[0].tolist() - cords = [round(x) for x in cords] - # Get the confidence - conf = round(box.conf[0].item(), 2) - # Print it out, adding a spacer between each object - # print("Object type:", class_id) - # print("Coordinates:", cords) - # print("Probability:", conf) - # print("---") - - # Now do stuff (if conf > 0.5) - if conf < args.object_confidence_threshold or ( - class_id not in args.detect_object and args.detect_object != [] - ): - # If the confidence is too low - # or if the object is not in the list of objects to detect and the list of objects to detect is not empty - # then skip this iteration - continue - - # Add the object to the list of objects to plot - plot_boxes.append( - { - "label": class_id, - "x1": cords[0], - "y1": cords[1], - "x2": cords[2], - "y2": cords[3], - } - ) - - objects_and_peoples = notify.thing_detected( - thing_name=class_id, - objects_and_peoples=objects_and_peoples, - detection_type="objects", - detection_window=args.detection_window, - detection_duration=args.detection_duration, - notification_window=args.notification_window, - ntfy_url=args.ntfy_url, - ) - - # To debug plotting, use r.plot() to cross reference the bounding boxes drawn by the plot_label() and r.plot() - frame_to_show = utils.plot_label( - boxes=plot_boxes, - full_frame=frame, - # full_frame=r.plot(), - run_scale=args.run_scale, - view_scale=args.view_scale, - ) - - # Display the resulting frame - # cv2.imshow("", r) - if not args.no_display: - cv2.imshow(f"Video{i}", frame_to_show) + model=model, + detect_object=args.detect_object, + object_confidence_threshold=args.object_confidence_threshold, + ) + # Display the resulting frame + # TODO: When multi-camera support is added, this needs to be changed to allow all feeds + if not args.no_display: + # When a face isn't recognized: "cv2.error: OpenCV(4.8.1) D:\a\opencv-python\opencv-python\opencv\modules\highgui\src\window.cpp:971: error: (-215:Assertion failed) size.width>0 && size.height>0 in function 'cv::imshow'" + # Seems to be because frame_to_show is null + cv2.imshow("Video", frame_to_show) # Hit 'q' on the keyboard to quit! if cv2.waitKey(1) & 0xFF == ord("q"): diff --git a/wyzely_detect/utils/cli_args.py b/wyzely_detect/utils/cli_args.py index b18d20f..0b6eaf0 100644 --- a/wyzely_detect/utils/cli_args.py +++ b/wyzely_detect/utils/cli_args.py @@ -92,6 +92,7 @@ def set_argparse(): help="The URL to send notifications to", ) + # Various timers timers = argparser.add_argument_group("Timers") timers.add_argument( "--detection-duration", diff --git a/wyzely_detect/utils/utils.py b/wyzely_detect/utils/utils.py index 19ea7ea..bc6a3b0 100644 --- a/wyzely_detect/utils/utils.py +++ b/wyzely_detect/utils/utils.py @@ -2,9 +2,165 @@ import cv2 import numpy as np from pathlib import Path from deepface import DeepFace +from . import notify first_face_try = True +# TODO: When multi-camera support is added, this will need to be changed so that each camera has its own dict +objects_and_peoples = { + "objects": {}, + "peoples": {}, +} + + +def process_footage( + # Frame + frame: np.ndarray = None, + + # scale + run_scale: float = None, + view_scale: float = None, + + # Face stuff + faces_directory: str = None, + face_confidence_threshold: float = None, + no_remove_representations: bool = False, + + # Timer stuff + detection_window: int = None, + detection_duration: int = None, + notification_window: int = None, + + ntfy_url: str = None, + + # Object stuff + # YOLO object + model = None, + detect_object: list = None, + object_confidence_threshold = None +) -> np.ndarray: + """ + Takes in a frame and processes it + """ + + global objects_and_peoples + + + # Resize frame of video to a smaller size for faster recognition processing + run_frame = cv2.resize(frame, (0, 0), fx=run_scale, fy=run_scale) + # view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale) + + results = model(run_frame, verbose=False) + + path_to_faces = Path(faces_directory) + path_to_faces_exists = path_to_faces.is_dir() + + for i, r in enumerate(results): + # list of dicts with each dict containing a label, x1, y1, x2, y2 + plot_boxes = [] + + # The following is stuff for people + # This is still in the for loop as each result, no matter if anything is detected, will be present. + # Thus, there will always be one result (r) + + # Only run if path_to_faces exists + # May be better to check every iteration, but this also works + if path_to_faces_exists: + if face_details := recognize_face( + path_to_directory=path_to_faces, + run_frame=run_frame, + # Perhaps make these names match? + min_confidence=face_confidence_threshold, + no_remove_representations=no_remove_representations, + ): + plot_boxes.append(face_details) + objects_and_peoples = notify.thing_detected( + thing_name=face_details["label"], + objects_and_peoples=objects_and_peoples, + detection_type="peoples", + detection_window=detection_window, + detection_duration=detection_duration, + notification_window=notification_window, + ntfy_url=ntfy_url, + ) + + # The following is stuff for objects + # Setup dictionary of object names + if ( + objects_and_peoples["objects"] == {} + or objects_and_peoples["objects"] is None + ): + for name in r.names.values(): + objects_and_peoples["objects"][name] = { + "last_detection_time": None, + "detection_duration": None, + # "first_detection_time": None, + "last_notification_time": None, + } + # Also, make sure that the objects to detect are in the list of objects_and_peoples + # If it isn't, print a warning + for obj in detect_object: + if obj not in objects_and_peoples: + print( + f"Warning: {obj} is not in the list of objects the model can detect!" + ) + + for box in r.boxes: + # Get the name of the object + class_id = r.names[box.cls[0].item()] + # Get the coordinates of the object + cords = box.xyxy[0].tolist() + cords = [round(x) for x in cords] + # Get the confidence + conf = round(box.conf[0].item(), 2) + # Print it out, adding a spacer between each object + # print("Object type:", class_id) + # print("Coordinates:", cords) + # print("Probability:", conf) + # print("---") + + # Now do stuff (if conf > 0.5) + if conf < object_confidence_threshold or ( + class_id not in detect_object and detect_object != [] + ): + # If the confidence is too low + # or if the object is not in the list of objects to detect and the list of objects to detect is not empty + # then skip this iteration + continue + + # Add the object to the list of objects to plot + plot_boxes.append( + { + "label": class_id, + "x1": cords[0], + "y1": cords[1], + "x2": cords[2], + "y2": cords[3], + } + ) + + objects_and_peoples = notify.thing_detected( + thing_name=class_id, + objects_and_peoples=objects_and_peoples, + detection_type="objects", + detection_window=detection_window, + detection_duration=detection_duration, + notification_window=notification_window, + ntfy_url=ntfy_url, + ) + + + # To debug plotting, use r.plot() to cross reference the bounding boxes drawn by the plot_label() and r.plot() + frame_to_show = plot_label( + boxes=plot_boxes, + full_frame=frame, + # full_frame=r.plot(), + run_scale=run_scale, + view_scale=view_scale, + ) + # Unsure if this should also return the objects_and_peoples dict + return frame_to_show + def plot_label( # list of dicts with each dict containing a label, x1, y1, x2, y2 @@ -18,7 +174,7 @@ def plot_label( # So the coordinates will be scaled appropriately when coming from run_frame view_scale: float = None, font: int = cv2.FONT_HERSHEY_SIMPLEX, -): +) -> np.ndarray: # x1 and y1 are the top left corner of the box # x2 and y2 are the bottom right corner of the box # Example scaling: full_frame: 1 run_frame: 0.5 view_frame: 0.25 @@ -176,6 +332,7 @@ def recognize_face( f"Cosine similarity: {cosine_similarity}, filname: {path_to_image.name}, to_return: {to_return}" ) return to_return + return None """ Example dataframe, for reference