From 3bf1966bfd5b01908cb3b8c7a7686cd02891f598 Mon Sep 17 00:00:00 2001 From: slashtechno <77907286+slashtechno@users.noreply.github.com> Date: Sat, 14 Oct 2023 15:40:36 -0500 Subject: [PATCH] Manage timers in notify.py --- .vscode/launch.json | 2 +- deepface-test.ipynb | 16 ++-- pyproject.toml | 4 +- set-detect-notify/__main__.py | 113 ++++++++----------------- set-detect-notify/utils/notify.py | 134 ++++++++++++++++++++++++++---- set-detect-notify/utils/utils.py | 57 ++++++++++++- 6 files changed, 220 insertions(+), 106 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 3d127f0..6fe5232 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -8,7 +8,7 @@ "name": "Python: Module", "type": "python", "request": "launch", - "module": "src", + "module": "set-detect-notify", "justMyCode": true } ] diff --git a/deepface-test.ipynb b/deepface-test.ipynb index a1aff6f..aba1306 100644 --- a/deepface-test.ipynb +++ b/deepface-test.ipynb @@ -31,11 +31,12 @@ "cap = cv2.VideoCapture(0)\n", "ret, frame = cap.read()\n", "cap.release()\n", - "uuid_str = str(uuid.uuid4())\n", - "uuid_path = Path(uuid_str + \".jpg\")\n", - "cv2.imwrite(str(uuid_path), frame)\n", - "dfs = DeepFace.find(img_path=str(uuid_path), db_path = \"faces\")\n", - "\n", + "# uuid_str = str(uuid.uuid4())\n", + "# uuid_path = Path(uuid_str + \".jpg\")\n", + "# cv2.imwrite(str(uuid_path), frame)\n", + "# dfs = DeepFace.find(img_path=str(uuid_path), db_path = \"faces\")\n", + "# Don't throw an error if no face is detected (enforce_detection=False)\n", + "dfs = DeepFace.find(frame, db_path = \"faces\", enforce_detection=False)\n", "# Get the identity of the person\n", "for i, pd_dataframe in enumerate(dfs):\n", " # Sort the dataframe by confidence\n", @@ -44,12 +45,13 @@ " print(f'On dataframe {i}')\n", " print(pd_dataframe)\n", " # Get the most likely identity\n", + " # print(f'Most likely identity: {pd_dataframe.iloc[0][\"identity\"]}')\n", " # We could use Path to get the parent directory of the image to use as the identity\n", - " print(f'Most likely identity: {pd_dataframe.iloc[0][\"identity\"]}')\n", + " print(f'Most likely identity: {Path(pd_dataframe.iloc[0][\"identity\"]).parent.name}')\n", " # Get the most likely identity's confidence\n", " print(f'Confidence: {pd_dataframe.iloc[0][\"VGG-Face_cosine\"]}')\n", "\n", - "uuid_path.unlink()" + "# uuid_path.unlink()" ] }, { diff --git a/pyproject.toml b/pyproject.toml index dbf40bc..7934293 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,4 +38,6 @@ build-backend = "poetry.core.masonry.api" [tool.ruff] # More than the default (88) of `black` to make comments less of a headache -line-length = 120 +# Where possible, `black` will attempt to format to 88 characters +# However, setting ruff to 135 will allow for longer lines that can't be auto-formatted +line-length = 135 diff --git a/set-detect-notify/__main__.py b/set-detect-notify/__main__.py index defefac..43b3510 100644 --- a/set-detect-notify/__main__.py +++ b/set-detect-notify/__main__.py @@ -1,10 +1,8 @@ # import face_recognition import cv2 -import numpy as np import dotenv from pathlib import Path import os -import time # import hjson as json import torch @@ -18,11 +16,14 @@ from .utils import utils DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" args = None -object_names = {} +objects_and_peoples = { + "objects": {}, + "peoples": {}, +} def main(): - global object_names + global objects_and_peoples global args # RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") # Replace this with code to check for gpu @@ -77,6 +78,15 @@ def main(): help="The object(s) to detect. Must be something the model is trained to detect", ) + argparser.add_argument( + "--faces-directory", + default=os.environ["FACES_DIRECTORY"] + if "FACES_DIRECTORY" in os.environ and os.environ["FACES_DIRECTORY"] != "" + else "faces", + type=str, + help="The directory to store the faces. Should contain 1 subdirectory of images per person", + ) + stream_source = argparser.add_mutually_exclusive_group() stream_source.add_argument( "--url", @@ -95,6 +105,10 @@ def main(): help="The capture device to use. Can also be a url.", ) + # Defaults for the stuff here and down are already set in notify.py. + # Setting them here just means that argparse will display the default values as defualt + # TODO: Perhaps just remove the default parameter and just add to the help message that the default is set is x + notifcation_services = argparser.add_argument_group("Notification Services") notifcation_services.add_argument( "--ntfy-url", @@ -178,18 +192,18 @@ def main(): # list of dicts with each dict containing a label, x1, y1, x2, y2 plot_boxes = [] # Setup dictionary of object names - if not object_names: + if objects_and_peoples["objects"] == {} or objects_and_peoples["objects"] is None: for name in r.names.values(): - object_names[name] = { + objects_and_peoples["objects"][name] = { "last_detection_time": None, "detection_duration": None, # "first_detection_time": None, "last_notification_time": None, } - # Also, make sure that the objects to detect are in the list of object_names + # Also, make sure that the objects to detect are in the list of objects_and_peoples # If it isn't, print a warning for obj in args.detect_object: - if obj not in object_names: + if obj not in objects_and_peoples: print( f"Warning: {obj} is not in the list of objects the model can detect!" ) @@ -228,79 +242,18 @@ def main(): } ) - # End goal: Send a notification when an object has been detected for 2 seconds in the past 15 seconds. - # However, don't send a notification if the last notification was less than 15 seconds ago + objects_and_peoples=notify.thing_detected( + thing_name=class_id, + objects_and_peoples=objects_and_peoples, + detection_type="objects", + detection_window=args.detection_window, + detection_duration=args.detection_duration, + notification_window=args.notification_window, + ntfy_url=args.ntfy_url, + ) - # (re)start cycle - if ( - # If the object has not been detected before - object_names[class_id]["last_detection_time"] is None - # If the last detection was more than 15 seconds ago - or time.time() - object_names[class_id]["last_detection_time"] - > args.detection_window - ): - # Set the last detection time to now - object_names[class_id]["last_detection_time"] = time.time() - print(f"First detection of {class_id} in this detection window") - # This line is important. It resets the detection duration when the object hasn't been detected for a while - # If detection duration is None, don't print anything. - # Otherwise, print that the detection duration is being reset due to inactivity - if object_names[class_id]["detection_duration"] is not None: - print( - f"Resetting detection duration for {class_id} since it hasn't been detected for {args.detection_window} seconds" # noqa: E501 - ) - object_names[class_id]["detection_duration"] = 0 - else: - # Check if the last notification was less than 15 seconds ago - # If it was, then don't do anything - if ( - time.time() - object_names[class_id]["last_detection_time"] - <= args.notification_window - ): - pass - # If it was more than 15 seconds ago, reset the detection duration - # This effectively resets the notification timer - else: - print("Notification timer has expired - resetting") - object_names[class_id]["detection_duration"] = 0 - object_names[class_id]["detection_duration"] += ( - time.time() - object_names[class_id]["last_detection_time"] - ) - # print("Updating detection duration") - object_names[class_id]["last_detection_time"] = time.time() - - # (re)send notification - # Check if detection has been ongoing for 2 seconds or more in the past 15 seconds - if ( - object_names[class_id]["detection_duration"] - >= args.detection_duration - and time.time() - object_names[class_id]["last_detection_time"] - <= args.detection_window - ): - # If the last notification was more than 15 seconds ago, then send a notification - if ( - object_names[class_id]["last_notification_time"] is None - or time.time() - - object_names[class_id]["last_notification_time"] - > args.notification_window - ): - object_names[class_id]["last_notification_time"] = time.time() - print( - f"Detected {class_id} for {args.detection_duration} seconds" - ) - headers = notify.construct_ntfy_headers( - title=f"{class_id} detected", - tag="rotating_light", - priority="default", - ) - notify.send_notification( - data=f"{class_id} detected for {args.detection_duration} seconds", - headers=headers, - url=args.ntfy_url, - ) - # Reset the detection duration - print("Just sent a notification - resetting detection duration") - object_names[class_id]["detection_duration"] = 0 + # TODO: On 10-14-2023, while testing, it seemed the bounding box was too low. Troubleshoot if it's a plotting problem. + # To do so, use r.plot() to cross reference the bounding box drawn by the plot_label function and r.plot() frame_to_show = utils.plot_label( boxes=plot_boxes, full_frame=frame, diff --git a/set-detect-notify/utils/notify.py b/set-detect-notify/utils/notify.py index efe6b09..780ef5d 100644 --- a/set-detect-notify/utils/notify.py +++ b/set-detect-notify/utils/notify.py @@ -1,5 +1,122 @@ -import datetime import httpx +import time + + +''' +Structure of objects_and_peoples +Really, the only reason peoples is a separate dictionary is to prevent duplicates, though it just makes the code more complicated. +TODO: Make a function to check if a person is in the objects dictionary and vice versa +{ + "objects": { + "object_name": { + "last_detection_time": float, + "detection_duration": float, + "last_notification_time": float, + }, + }, + "peoples": { + "person_name": { + "last_detection_time": float, + "detection_duration": float, + "last_notification_time": float, + }, + }, +} +''' +# objects_and_peoples = {} + + +def thing_detected( + thing_name: str, + objects_and_peoples: dict, + detection_type: str = "objects", + detection_window: int = 15, + detection_duration: int = 2, + notification_window: int = 15, + ntfy_url: str = "https://ntfy.sh/set-detect-notify" + ) -> dict: + ''' + A function to make sure 2 seconds of detection is detected in 15 seconds, 15 seconds apart. + Takes a dict that will be retured with the updated detection times. MAKE SURE TO SAVE THE RETURNED DICTIONARY + ''' + + # "Alias" the objects and peoples dictionaries so it's easier to work with + respective_type = objects_and_peoples[detection_type] + + # (re)start cycle + if ( + # If the object has not been detected before + respective_type[thing_name]["last_detection_time"] is None + # If the last detection was more than 15 seconds ago + or time.time() - respective_type[thing_name]["last_detection_time"] + > detection_window + ): + # Set the last detection time to now + respective_type[thing_name]["last_detection_time"] = time.time() + print(f"First detection of {thing_name} in this detection window") + # This line is important. It resets the detection duration when the object hasn't been detected for a while + # If detection duration is None, don't print anything. + # Otherwise, print that the detection duration is being reset due to inactivity + if respective_type[thing_name]["detection_duration"] is not None: + print( + f"Resetting detection duration for {thing_name} since it hasn't been detected for {detection_window} seconds" # noqa: E501 + ) + respective_type[thing_name]["detection_duration"] = 0 + else: + # Check if the last notification was less than 15 seconds ago + # If it was, then don't do anything + if ( + time.time() - respective_type[thing_name]["last_detection_time"] + <= notification_window + ): + pass + # If it was more than 15 seconds ago, reset the detection duration + # This effectively resets the notification timer + else: + print("Notification timer has expired - resetting") + respective_type[thing_name]["detection_duration"] = 0 + respective_type[thing_name]["detection_duration"] += ( + time.time() - respective_type[thing_name]["last_detection_time"] + ) + # print("Updating detection duration") + respective_type[thing_name]["last_detection_time"] = time.time() + + # (re)send notification + # Check if detection has been ongoing for 2 seconds or more in the past 15 seconds + if ( + respective_type[thing_name]["detection_duration"] + >= detection_duration + and time.time() - respective_type[thing_name]["last_detection_time"] + <= detection_window + ): + # If the last notification was more than 15 seconds ago, then send a notification + if ( + respective_type[thing_name]["last_notification_time"] is None + or time.time() + - respective_type[thing_name]["last_notification_time"] + > notification_window + ): + respective_type[thing_name]["last_notification_time"] = time.time() + print( + f"Detected {thing_name} for {detection_duration} seconds" + ) + headers = construct_ntfy_headers( + title=f"{thing_name} detected", + tag="rotating_light", + priority="default", + ) + send_notification( + data=f"{thing_name} detected for {detection_duration} seconds", + headers=headers, + url=ntfy_url, + ) + # Reset the detection duration + print("Just sent a notification - resetting detection duration") + respective_type[thing_name]["detection_duration"] = 0 + + # Take the aliased objects_and_peoples and update the respective dictionary + objects_and_peoples[detection_type] = respective_type + return objects_and_peoples def construct_ntfy_headers( @@ -15,18 +132,3 @@ def send_notification(data: str, headers: dict, url: str): raise ValueError("url and data cannot be None") httpx.post(url, data=data.encode("utf-8"), headers=headers) - -def check_last_seen(last_seen: datetime.datetime, seconds: int = 15): - """ - Check if a time is older than a given number of seconds - If it is, return True - If last_seen is empty/null, return True - """ - if ( - datetime.datetime.now() - last_seen > datetime.timedelta(seconds=seconds) - or last_seen == "" - or last_seen is None - ): - return True - else: - return False diff --git a/set-detect-notify/utils/utils.py b/set-detect-notify/utils/utils.py index 82cac73..7f168c1 100644 --- a/set-detect-notify/utils/utils.py +++ b/set-detect-notify/utils/utils.py @@ -1,6 +1,7 @@ import cv2 import numpy as np - +from pathlib import Path +from deepface import DeepFace def plot_label( # list of dicts with each dict containing a label, x1, y1, x2, y2 @@ -55,3 +56,57 @@ def plot_label( 1, ) return view_frame + + +def recognize_face( + path_to_directory: Path = Path("faces"), + # opencv image + run_frame: np.ndarray = None, +) -> np.ndarray: + ''' + Accepts a path to a directory of images of faces to be used as a refference + In addition, accepts an opencv image to be used as the frame to be searched + + Returns a list of dictionaries, containing a single dictonary as currently only 1 face can be detected in each frame + dict contains the following keys: label, x1, y1, x2, y2 + The directory should be structured as follows: + faces/ + name/ + image1.jpg + image2.jpg + image3.jpg + name2/ + image1.jpg + image2.jpg + image3.jpg + (not neccessarily jpgs, but you get the idea) + + Point is, `name` is the name of the person in the images in the directory `name` + That name will be used as the label for the face in the frame + ''' + # face_dataframes is a vanilla list of dataframes + face_dataframes = DeepFace.find(run_frame, db_path=str(path_to_directory)) + # Iteate over the dataframes + for df in face_dataframes: + # The last row is the highest confidence + # So we can just grab the path from there + # iloc = Integer LOCation + path_to_image = Path(df.iloc[-1]["identity"]) + # Get the name of the parent directory + label = path_to_image.parent.name + # Return the coordinates of the box in xyxy format, rather than xywh + # This is because YOLO uses xyxy, and that's how plot_label expects + # Also, xyxy is just the top left and bottom right corners of the box + coordinates = { + "x1": df.iloc[-1]["source_x"], + "y1": df.iloc[-1]["source_y"], + "x2": df.iloc[-1]["source_x"] + df.iloc[-1]["source_w"], + "y2": df.iloc[-1]["source_y"] + df.iloc[-1]["source_h"], + } + + return [dict(label=label, **coordinates)] + + ''' + Example dataframe, for reference + identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \_('_')_/) + ''' \ No newline at end of file