Manage timers in notify.py
This commit is contained in:
parent
3c6919d2c6
commit
3bf1966bfd
|
@ -8,7 +8,7 @@
|
|||
"name": "Python: Module",
|
||||
"type": "python",
|
||||
"request": "launch",
|
||||
"module": "src",
|
||||
"module": "set-detect-notify",
|
||||
"justMyCode": true
|
||||
}
|
||||
]
|
||||
|
|
|
@ -31,11 +31,12 @@
|
|||
"cap = cv2.VideoCapture(0)\n",
|
||||
"ret, frame = cap.read()\n",
|
||||
"cap.release()\n",
|
||||
"uuid_str = str(uuid.uuid4())\n",
|
||||
"uuid_path = Path(uuid_str + \".jpg\")\n",
|
||||
"cv2.imwrite(str(uuid_path), frame)\n",
|
||||
"dfs = DeepFace.find(img_path=str(uuid_path), db_path = \"faces\")\n",
|
||||
"\n",
|
||||
"# uuid_str = str(uuid.uuid4())\n",
|
||||
"# uuid_path = Path(uuid_str + \".jpg\")\n",
|
||||
"# cv2.imwrite(str(uuid_path), frame)\n",
|
||||
"# dfs = DeepFace.find(img_path=str(uuid_path), db_path = \"faces\")\n",
|
||||
"# Don't throw an error if no face is detected (enforce_detection=False)\n",
|
||||
"dfs = DeepFace.find(frame, db_path = \"faces\", enforce_detection=False)\n",
|
||||
"# Get the identity of the person\n",
|
||||
"for i, pd_dataframe in enumerate(dfs):\n",
|
||||
" # Sort the dataframe by confidence\n",
|
||||
|
@ -44,12 +45,13 @@
|
|||
" print(f'On dataframe {i}')\n",
|
||||
" print(pd_dataframe)\n",
|
||||
" # Get the most likely identity\n",
|
||||
" # print(f'Most likely identity: {pd_dataframe.iloc[0][\"identity\"]}')\n",
|
||||
" # We could use Path to get the parent directory of the image to use as the identity\n",
|
||||
" print(f'Most likely identity: {pd_dataframe.iloc[0][\"identity\"]}')\n",
|
||||
" print(f'Most likely identity: {Path(pd_dataframe.iloc[0][\"identity\"]).parent.name}')\n",
|
||||
" # Get the most likely identity's confidence\n",
|
||||
" print(f'Confidence: {pd_dataframe.iloc[0][\"VGG-Face_cosine\"]}')\n",
|
||||
"\n",
|
||||
"uuid_path.unlink()"
|
||||
"# uuid_path.unlink()"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
|
|
@ -38,4 +38,6 @@ build-backend = "poetry.core.masonry.api"
|
|||
|
||||
[tool.ruff]
|
||||
# More than the default (88) of `black` to make comments less of a headache
|
||||
line-length = 120
|
||||
# Where possible, `black` will attempt to format to 88 characters
|
||||
# However, setting ruff to 135 will allow for longer lines that can't be auto-formatted
|
||||
line-length = 135
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
# import face_recognition
|
||||
import cv2
|
||||
import numpy as np
|
||||
import dotenv
|
||||
from pathlib import Path
|
||||
import os
|
||||
import time
|
||||
|
||||
# import hjson as json
|
||||
import torch
|
||||
|
@ -18,11 +16,14 @@ from .utils import utils
|
|||
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||||
args = None
|
||||
|
||||
object_names = {}
|
||||
objects_and_peoples = {
|
||||
"objects": {},
|
||||
"peoples": {},
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
global object_names
|
||||
global objects_and_peoples
|
||||
global args
|
||||
# RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") # Replace this with code to check for gpu
|
||||
|
||||
|
@ -77,6 +78,15 @@ def main():
|
|||
help="The object(s) to detect. Must be something the model is trained to detect",
|
||||
)
|
||||
|
||||
argparser.add_argument(
|
||||
"--faces-directory",
|
||||
default=os.environ["FACES_DIRECTORY"]
|
||||
if "FACES_DIRECTORY" in os.environ and os.environ["FACES_DIRECTORY"] != ""
|
||||
else "faces",
|
||||
type=str,
|
||||
help="The directory to store the faces. Should contain 1 subdirectory of images per person",
|
||||
)
|
||||
|
||||
stream_source = argparser.add_mutually_exclusive_group()
|
||||
stream_source.add_argument(
|
||||
"--url",
|
||||
|
@ -95,6 +105,10 @@ def main():
|
|||
help="The capture device to use. Can also be a url.",
|
||||
)
|
||||
|
||||
# Defaults for the stuff here and down are already set in notify.py.
|
||||
# Setting them here just means that argparse will display the default values as defualt
|
||||
# TODO: Perhaps just remove the default parameter and just add to the help message that the default is set is x
|
||||
|
||||
notifcation_services = argparser.add_argument_group("Notification Services")
|
||||
notifcation_services.add_argument(
|
||||
"--ntfy-url",
|
||||
|
@ -178,18 +192,18 @@ def main():
|
|||
# list of dicts with each dict containing a label, x1, y1, x2, y2
|
||||
plot_boxes = []
|
||||
# Setup dictionary of object names
|
||||
if not object_names:
|
||||
if objects_and_peoples["objects"] == {} or objects_and_peoples["objects"] is None:
|
||||
for name in r.names.values():
|
||||
object_names[name] = {
|
||||
objects_and_peoples["objects"][name] = {
|
||||
"last_detection_time": None,
|
||||
"detection_duration": None,
|
||||
# "first_detection_time": None,
|
||||
"last_notification_time": None,
|
||||
}
|
||||
# Also, make sure that the objects to detect are in the list of object_names
|
||||
# Also, make sure that the objects to detect are in the list of objects_and_peoples
|
||||
# If it isn't, print a warning
|
||||
for obj in args.detect_object:
|
||||
if obj not in object_names:
|
||||
if obj not in objects_and_peoples:
|
||||
print(
|
||||
f"Warning: {obj} is not in the list of objects the model can detect!"
|
||||
)
|
||||
|
@ -228,79 +242,18 @@ def main():
|
|||
}
|
||||
)
|
||||
|
||||
# End goal: Send a notification when an object has been detected for 2 seconds in the past 15 seconds.
|
||||
# However, don't send a notification if the last notification was less than 15 seconds ago
|
||||
objects_and_peoples=notify.thing_detected(
|
||||
thing_name=class_id,
|
||||
objects_and_peoples=objects_and_peoples,
|
||||
detection_type="objects",
|
||||
detection_window=args.detection_window,
|
||||
detection_duration=args.detection_duration,
|
||||
notification_window=args.notification_window,
|
||||
ntfy_url=args.ntfy_url,
|
||||
)
|
||||
|
||||
# (re)start cycle
|
||||
if (
|
||||
# If the object has not been detected before
|
||||
object_names[class_id]["last_detection_time"] is None
|
||||
# If the last detection was more than 15 seconds ago
|
||||
or time.time() - object_names[class_id]["last_detection_time"]
|
||||
> args.detection_window
|
||||
):
|
||||
# Set the last detection time to now
|
||||
object_names[class_id]["last_detection_time"] = time.time()
|
||||
print(f"First detection of {class_id} in this detection window")
|
||||
# This line is important. It resets the detection duration when the object hasn't been detected for a while
|
||||
# If detection duration is None, don't print anything.
|
||||
# Otherwise, print that the detection duration is being reset due to inactivity
|
||||
if object_names[class_id]["detection_duration"] is not None:
|
||||
print(
|
||||
f"Resetting detection duration for {class_id} since it hasn't been detected for {args.detection_window} seconds" # noqa: E501
|
||||
)
|
||||
object_names[class_id]["detection_duration"] = 0
|
||||
else:
|
||||
# Check if the last notification was less than 15 seconds ago
|
||||
# If it was, then don't do anything
|
||||
if (
|
||||
time.time() - object_names[class_id]["last_detection_time"]
|
||||
<= args.notification_window
|
||||
):
|
||||
pass
|
||||
# If it was more than 15 seconds ago, reset the detection duration
|
||||
# This effectively resets the notification timer
|
||||
else:
|
||||
print("Notification timer has expired - resetting")
|
||||
object_names[class_id]["detection_duration"] = 0
|
||||
object_names[class_id]["detection_duration"] += (
|
||||
time.time() - object_names[class_id]["last_detection_time"]
|
||||
)
|
||||
# print("Updating detection duration")
|
||||
object_names[class_id]["last_detection_time"] = time.time()
|
||||
|
||||
# (re)send notification
|
||||
# Check if detection has been ongoing for 2 seconds or more in the past 15 seconds
|
||||
if (
|
||||
object_names[class_id]["detection_duration"]
|
||||
>= args.detection_duration
|
||||
and time.time() - object_names[class_id]["last_detection_time"]
|
||||
<= args.detection_window
|
||||
):
|
||||
# If the last notification was more than 15 seconds ago, then send a notification
|
||||
if (
|
||||
object_names[class_id]["last_notification_time"] is None
|
||||
or time.time()
|
||||
- object_names[class_id]["last_notification_time"]
|
||||
> args.notification_window
|
||||
):
|
||||
object_names[class_id]["last_notification_time"] = time.time()
|
||||
print(
|
||||
f"Detected {class_id} for {args.detection_duration} seconds"
|
||||
)
|
||||
headers = notify.construct_ntfy_headers(
|
||||
title=f"{class_id} detected",
|
||||
tag="rotating_light",
|
||||
priority="default",
|
||||
)
|
||||
notify.send_notification(
|
||||
data=f"{class_id} detected for {args.detection_duration} seconds",
|
||||
headers=headers,
|
||||
url=args.ntfy_url,
|
||||
)
|
||||
# Reset the detection duration
|
||||
print("Just sent a notification - resetting detection duration")
|
||||
object_names[class_id]["detection_duration"] = 0
|
||||
# TODO: On 10-14-2023, while testing, it seemed the bounding box was too low. Troubleshoot if it's a plotting problem.
|
||||
# To do so, use r.plot() to cross reference the bounding box drawn by the plot_label function and r.plot()
|
||||
frame_to_show = utils.plot_label(
|
||||
boxes=plot_boxes,
|
||||
full_frame=frame,
|
||||
|
|
|
@ -1,5 +1,122 @@
|
|||
import datetime
|
||||
import httpx
|
||||
import time
|
||||
|
||||
|
||||
'''
|
||||
Structure of objects_and_peoples
|
||||
Really, the only reason peoples is a separate dictionary is to prevent duplicates, though it just makes the code more complicated.
|
||||
TODO: Make a function to check if a person is in the objects dictionary and vice versa
|
||||
{
|
||||
"objects": {
|
||||
"object_name": {
|
||||
"last_detection_time": float,
|
||||
"detection_duration": float,
|
||||
"last_notification_time": float,
|
||||
},
|
||||
},
|
||||
"peoples": {
|
||||
"person_name": {
|
||||
"last_detection_time": float,
|
||||
"detection_duration": float,
|
||||
"last_notification_time": float,
|
||||
},
|
||||
},
|
||||
}
|
||||
'''
|
||||
# objects_and_peoples = {}
|
||||
|
||||
|
||||
def thing_detected(
|
||||
thing_name: str,
|
||||
objects_and_peoples: dict,
|
||||
detection_type: str = "objects",
|
||||
detection_window: int = 15,
|
||||
detection_duration: int = 2,
|
||||
notification_window: int = 15,
|
||||
ntfy_url: str = "https://ntfy.sh/set-detect-notify"
|
||||
) -> dict:
|
||||
'''
|
||||
A function to make sure 2 seconds of detection is detected in 15 seconds, 15 seconds apart.
|
||||
Takes a dict that will be retured with the updated detection times. MAKE SURE TO SAVE THE RETURNED DICTIONARY
|
||||
'''
|
||||
|
||||
# "Alias" the objects and peoples dictionaries so it's easier to work with
|
||||
respective_type = objects_and_peoples[detection_type]
|
||||
|
||||
# (re)start cycle
|
||||
if (
|
||||
# If the object has not been detected before
|
||||
respective_type[thing_name]["last_detection_time"] is None
|
||||
# If the last detection was more than 15 seconds ago
|
||||
or time.time() - respective_type[thing_name]["last_detection_time"]
|
||||
> detection_window
|
||||
):
|
||||
# Set the last detection time to now
|
||||
respective_type[thing_name]["last_detection_time"] = time.time()
|
||||
print(f"First detection of {thing_name} in this detection window")
|
||||
# This line is important. It resets the detection duration when the object hasn't been detected for a while
|
||||
# If detection duration is None, don't print anything.
|
||||
# Otherwise, print that the detection duration is being reset due to inactivity
|
||||
if respective_type[thing_name]["detection_duration"] is not None:
|
||||
print(
|
||||
f"Resetting detection duration for {thing_name} since it hasn't been detected for {detection_window} seconds" # noqa: E501
|
||||
)
|
||||
respective_type[thing_name]["detection_duration"] = 0
|
||||
else:
|
||||
# Check if the last notification was less than 15 seconds ago
|
||||
# If it was, then don't do anything
|
||||
if (
|
||||
time.time() - respective_type[thing_name]["last_detection_time"]
|
||||
<= notification_window
|
||||
):
|
||||
pass
|
||||
# If it was more than 15 seconds ago, reset the detection duration
|
||||
# This effectively resets the notification timer
|
||||
else:
|
||||
print("Notification timer has expired - resetting")
|
||||
respective_type[thing_name]["detection_duration"] = 0
|
||||
respective_type[thing_name]["detection_duration"] += (
|
||||
time.time() - respective_type[thing_name]["last_detection_time"]
|
||||
)
|
||||
# print("Updating detection duration")
|
||||
respective_type[thing_name]["last_detection_time"] = time.time()
|
||||
|
||||
# (re)send notification
|
||||
# Check if detection has been ongoing for 2 seconds or more in the past 15 seconds
|
||||
if (
|
||||
respective_type[thing_name]["detection_duration"]
|
||||
>= detection_duration
|
||||
and time.time() - respective_type[thing_name]["last_detection_time"]
|
||||
<= detection_window
|
||||
):
|
||||
# If the last notification was more than 15 seconds ago, then send a notification
|
||||
if (
|
||||
respective_type[thing_name]["last_notification_time"] is None
|
||||
or time.time()
|
||||
- respective_type[thing_name]["last_notification_time"]
|
||||
> notification_window
|
||||
):
|
||||
respective_type[thing_name]["last_notification_time"] = time.time()
|
||||
print(
|
||||
f"Detected {thing_name} for {detection_duration} seconds"
|
||||
)
|
||||
headers = construct_ntfy_headers(
|
||||
title=f"{thing_name} detected",
|
||||
tag="rotating_light",
|
||||
priority="default",
|
||||
)
|
||||
send_notification(
|
||||
data=f"{thing_name} detected for {detection_duration} seconds",
|
||||
headers=headers,
|
||||
url=ntfy_url,
|
||||
)
|
||||
# Reset the detection duration
|
||||
print("Just sent a notification - resetting detection duration")
|
||||
respective_type[thing_name]["detection_duration"] = 0
|
||||
|
||||
# Take the aliased objects_and_peoples and update the respective dictionary
|
||||
objects_and_peoples[detection_type] = respective_type
|
||||
return objects_and_peoples
|
||||
|
||||
|
||||
def construct_ntfy_headers(
|
||||
|
@ -15,18 +132,3 @@ def send_notification(data: str, headers: dict, url: str):
|
|||
raise ValueError("url and data cannot be None")
|
||||
httpx.post(url, data=data.encode("utf-8"), headers=headers)
|
||||
|
||||
|
||||
def check_last_seen(last_seen: datetime.datetime, seconds: int = 15):
|
||||
"""
|
||||
Check if a time is older than a given number of seconds
|
||||
If it is, return True
|
||||
If last_seen is empty/null, return True
|
||||
"""
|
||||
if (
|
||||
datetime.datetime.now() - last_seen > datetime.timedelta(seconds=seconds)
|
||||
or last_seen == ""
|
||||
or last_seen is None
|
||||
):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from pathlib import Path
|
||||
from deepface import DeepFace
|
||||
|
||||
def plot_label(
|
||||
# list of dicts with each dict containing a label, x1, y1, x2, y2
|
||||
|
@ -55,3 +56,57 @@ def plot_label(
|
|||
1,
|
||||
)
|
||||
return view_frame
|
||||
|
||||
|
||||
def recognize_face(
|
||||
path_to_directory: Path = Path("faces"),
|
||||
# opencv image
|
||||
run_frame: np.ndarray = None,
|
||||
) -> np.ndarray:
|
||||
'''
|
||||
Accepts a path to a directory of images of faces to be used as a refference
|
||||
In addition, accepts an opencv image to be used as the frame to be searched
|
||||
|
||||
Returns a list of dictionaries, containing a single dictonary as currently only 1 face can be detected in each frame
|
||||
dict contains the following keys: label, x1, y1, x2, y2
|
||||
The directory should be structured as follows:
|
||||
faces/
|
||||
name/
|
||||
image1.jpg
|
||||
image2.jpg
|
||||
image3.jpg
|
||||
name2/
|
||||
image1.jpg
|
||||
image2.jpg
|
||||
image3.jpg
|
||||
(not neccessarily jpgs, but you get the idea)
|
||||
|
||||
Point is, `name` is the name of the person in the images in the directory `name`
|
||||
That name will be used as the label for the face in the frame
|
||||
'''
|
||||
# face_dataframes is a vanilla list of dataframes
|
||||
face_dataframes = DeepFace.find(run_frame, db_path=str(path_to_directory))
|
||||
# Iteate over the dataframes
|
||||
for df in face_dataframes:
|
||||
# The last row is the highest confidence
|
||||
# So we can just grab the path from there
|
||||
# iloc = Integer LOCation
|
||||
path_to_image = Path(df.iloc[-1]["identity"])
|
||||
# Get the name of the parent directory
|
||||
label = path_to_image.parent.name
|
||||
# Return the coordinates of the box in xyxy format, rather than xywh
|
||||
# This is because YOLO uses xyxy, and that's how plot_label expects
|
||||
# Also, xyxy is just the top left and bottom right corners of the box
|
||||
coordinates = {
|
||||
"x1": df.iloc[-1]["source_x"],
|
||||
"y1": df.iloc[-1]["source_y"],
|
||||
"x2": df.iloc[-1]["source_x"] + df.iloc[-1]["source_w"],
|
||||
"y2": df.iloc[-1]["source_y"] + df.iloc[-1]["source_h"],
|
||||
}
|
||||
|
||||
return [dict(label=label, **coordinates)]
|
||||
|
||||
'''
|
||||
Example dataframe, for reference
|
||||
identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \_('_')_/)
|
||||
'''
|
Loading…
Reference in New Issue