commit
7d942ee456
|
@ -2,5 +2,7 @@
|
||||||
config/
|
config/
|
||||||
using_yolov8.ipynb
|
using_yolov8.ipynb
|
||||||
yolov8n.pt
|
yolov8n.pt
|
||||||
|
.venv/
|
||||||
__pycache__/
|
__pycache__/
|
||||||
|
faces/*
|
||||||
|
!faces/.gitkeep
|
|
@ -8,7 +8,7 @@
|
||||||
"name": "Python: Module",
|
"name": "Python: Module",
|
||||||
"type": "python",
|
"type": "python",
|
||||||
"request": "launch",
|
"request": "launch",
|
||||||
"module": "src",
|
"module": "set_detect_notify",
|
||||||
"justMyCode": true
|
"justMyCode": true
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
|
@ -0,0 +1,95 @@
|
||||||
|
{
|
||||||
|
"cells": [
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"from deepface import DeepFace\n",
|
||||||
|
"import cv2\n",
|
||||||
|
"from pathlib import Path\n",
|
||||||
|
"import uuid\n",
|
||||||
|
"import pandas as pd"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"Take pictures"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"# Take a picture using opencv with <uuid>.jpg\n",
|
||||||
|
"# Then delete it after\n",
|
||||||
|
"cap = cv2.VideoCapture(0)\n",
|
||||||
|
"ret, frame = cap.read()\n",
|
||||||
|
"cap.release()\n",
|
||||||
|
"# uuid_str = str(uuid.uuid4())\n",
|
||||||
|
"# uuid_path = Path(uuid_str + \".jpg\")\n",
|
||||||
|
"# cv2.imwrite(str(uuid_path), frame)\n",
|
||||||
|
"# dfs = DeepFace.find(img_path=str(uuid_path), db_path = \"faces\")\n",
|
||||||
|
"# Don't throw an error if no face is detected (enforce_detection=False)\n",
|
||||||
|
"dfs = DeepFace.find(frame, db_path = \"faces\", enforce_detection=False)\n",
|
||||||
|
"# Get the identity of the person\n",
|
||||||
|
"for i, pd_dataframe in enumerate(dfs):\n",
|
||||||
|
" # Sort the dataframe by confidence\n",
|
||||||
|
" # inplace=True means that the dataframe is modified so we don't need to assign it to a new variable\n",
|
||||||
|
" pd_dataframe.sort_values(by=['VGG-Face_cosine'], inplace=True, ascending=False)\n",
|
||||||
|
" print(f'On dataframe {i}')\n",
|
||||||
|
" print(pd_dataframe)\n",
|
||||||
|
" # Get the most likely identity\n",
|
||||||
|
" # print(f'Most likely identity: {pd_dataframe.iloc[0][\"identity\"]}')\n",
|
||||||
|
" # We could use Path to get the parent directory of the image to use as the identity\n",
|
||||||
|
" print(f'Most likely identity: {Path(pd_dataframe.iloc[0][\"identity\"]).parent.name}')\n",
|
||||||
|
" # Get the most likely identity's confidence\n",
|
||||||
|
" print(f'Confidence: {pd_dataframe.iloc[0][\"VGG-Face_cosine\"]}')\n",
|
||||||
|
"\n",
|
||||||
|
"# uuid_path.unlink()"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"Stream"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"DeepFace.stream(db_path=\"faces\")"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": ".venv",
|
||||||
|
"language": "python",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"codemirror_mode": {
|
||||||
|
"name": "ipython",
|
||||||
|
"version": 3
|
||||||
|
},
|
||||||
|
"file_extension": ".py",
|
||||||
|
"mimetype": "text/x-python",
|
||||||
|
"name": "python",
|
||||||
|
"nbconvert_exporter": "python",
|
||||||
|
"pygments_lexer": "ipython3",
|
||||||
|
"version": "3.10.5"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 2
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -1,58 +1,36 @@
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "set-detect-notify"
|
name = "set_detect_notify"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
description = "Detect all the things"
|
description = "Detect all the things"
|
||||||
authors = ["slashtechno <77907286+slashtechno@users.noreply.github.com>"]
|
authors = ["slashtechno <77907286+slashtechno@users.noreply.github.com>"]
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
packages = [{include = "set-detect-notify"}]
|
packages = [{include = "set_detect_notify"}]
|
||||||
|
|
||||||
[tool.poetry.dependencies]
|
[tool.poetry.dependencies]
|
||||||
python = "^3.10"
|
# python = "^3.10"
|
||||||
|
python = ">=3.10, <3.12"
|
||||||
python-dotenv = "^1.0.0"
|
python-dotenv = "^1.0.0"
|
||||||
httpx = "^0.25.0"
|
httpx = "^0.25.0"
|
||||||
opencv-python = "^4.8.1.78"
|
opencv-python = "^4.8.1.78"
|
||||||
ultralytics = "^8.0.190"
|
ultralytics = "^8.0.190"
|
||||||
hjson = "^3.1.0"
|
hjson = "^3.1.0"
|
||||||
numpy = "^1.23.2"
|
numpy = "^1.23.2"
|
||||||
# torch = [
|
|
||||||
# { version = "^2.0.0+cu118", source = "torch_cu118", markers = "extra=='cuda'" },
|
# https://github.com/python-poetry/poetry/issues/6409
|
||||||
# { version = "^2.0.0+cpu", source = "torch_cpu", markers = "extra!='cuda'" },
|
|
||||||
# ]
|
|
||||||
# torchaudio = [
|
|
||||||
# { version = "^2.0.0+cu118", source = "torch_cu118", markers = "extra=='cuda'" },
|
|
||||||
# { version = "^2.0.0+cpu", source = "torch_cpu", markers = "extra!='cuda'" },
|
|
||||||
# ]
|
|
||||||
# torchvision = [
|
|
||||||
# { version = "^0.15+cu118", source = "torch_cu118", markers = "extra=='cuda'" },
|
|
||||||
# { version = "^0.15+cpu", source = "torch_cpu", markers = "extra!='cuda'" },
|
|
||||||
# ]
|
|
||||||
torch = "^2.1.0"
|
torch = "^2.1.0"
|
||||||
|
|
||||||
|
tensorflow-io-gcs-filesystem = "0.31.0"
|
||||||
|
deepface = "^0.0.79"
|
||||||
|
|
||||||
|
|
||||||
[tool.poetry.group.dev.dependencies]
|
[tool.poetry.group.dev.dependencies]
|
||||||
black = "^23.9.1"
|
black = "^23.9.1"
|
||||||
ruff = "^0.0.291"
|
ruff = "^0.0.291"
|
||||||
ipykernel = "^6.25.2"
|
ipykernel = "^6.25.2"
|
||||||
|
nbconvert = "^7.9.2"
|
||||||
|
|
||||||
|
|
||||||
# [[tool.poetry.source]]
|
|
||||||
# name = "torch_cpu"
|
|
||||||
# url = "https://download.pytorch.org/whl/cpu"
|
|
||||||
# priority = "supplemental"
|
|
||||||
#
|
|
||||||
# [[tool.poetry.source]]
|
|
||||||
# name = "torch_cu118"
|
|
||||||
# url = "https://download.pytorch.org/whl/cu118"
|
|
||||||
# priority = "supplemental"
|
|
||||||
#
|
|
||||||
# [tool.poetry.extras]
|
|
||||||
# cuda = []
|
|
||||||
#
|
|
||||||
# [[tool.poetry.source]]
|
|
||||||
# name = "PyPI"
|
|
||||||
# priority = "primary"
|
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["poetry-core"]
|
requires = ["poetry-core"]
|
||||||
build-backend = "poetry.core.masonry.api"
|
build-backend = "poetry.core.masonry.api"
|
||||||
|
@ -60,4 +38,9 @@ build-backend = "poetry.core.masonry.api"
|
||||||
|
|
||||||
[tool.ruff]
|
[tool.ruff]
|
||||||
# More than the default (88) of `black` to make comments less of a headache
|
# More than the default (88) of `black` to make comments less of a headache
|
||||||
line-length = 120
|
# Where possible, `black` will attempt to format to 88 characters
|
||||||
|
# However, setting ruff to 135 will allow for longer lines that can't be auto-formatted
|
||||||
|
line-length = 135
|
||||||
|
|
||||||
|
[tool.poetry.scripts]
|
||||||
|
set-detect-notify = "set_detect_notify.__main__:main"
|
|
@ -1,32 +0,0 @@
|
||||||
import datetime
|
|
||||||
import httpx
|
|
||||||
|
|
||||||
|
|
||||||
def construct_ntfy_headers(
|
|
||||||
title: str = "Object/Person Detected",
|
|
||||||
tag="rotating_light", # https://docs.ntfy.sh/publish/#tags-emojis
|
|
||||||
priority="default", # https://docs.ntfy.sh/publish/#message-priority
|
|
||||||
) -> dict:
|
|
||||||
return {"Title": title, "Priority": priority, "Tags": tag}
|
|
||||||
|
|
||||||
|
|
||||||
def send_notification(data: str, headers: dict, url: str):
|
|
||||||
if url is None or data is None:
|
|
||||||
raise ValueError("url and data cannot be None")
|
|
||||||
httpx.post(url, data=data.encode("utf-8"), headers=headers)
|
|
||||||
|
|
||||||
|
|
||||||
def check_last_seen(last_seen: datetime.datetime, seconds: int = 15):
|
|
||||||
"""
|
|
||||||
Check if a time is older than a given number of seconds
|
|
||||||
If it is, return True
|
|
||||||
If last_seen is empty/null, return True
|
|
||||||
"""
|
|
||||||
if (
|
|
||||||
datetime.datetime.now() - last_seen > datetime.timedelta(seconds=seconds)
|
|
||||||
or last_seen == ""
|
|
||||||
or last_seen is None
|
|
||||||
):
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
return False
|
|
|
@ -1,57 +0,0 @@
|
||||||
import cv2
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
|
|
||||||
def plot_label(
|
|
||||||
# list of dicts with each dict containing a label, x1, y1, x2, y2
|
|
||||||
boxes: list = None,
|
|
||||||
# opencv image
|
|
||||||
full_frame: np.ndarray = None,
|
|
||||||
# run_scale is the scale of the image that was used to run the model
|
|
||||||
# So the coordinates will be scaled up to the view frame size
|
|
||||||
run_scale: float = None,
|
|
||||||
# view_scale is the scale of the image, in relation to the full frame
|
|
||||||
# So the coordinates will be scaled appropriately when coming from run_frame
|
|
||||||
view_scale: float = None,
|
|
||||||
font: int = cv2.FONT_HERSHEY_SIMPLEX,
|
|
||||||
):
|
|
||||||
view_frame = cv2.resize(full_frame, (0, 0), fx=view_scale, fy=view_scale)
|
|
||||||
for thing in boxes:
|
|
||||||
cv2.rectangle(
|
|
||||||
# Image
|
|
||||||
view_frame,
|
|
||||||
# Start point
|
|
||||||
(
|
|
||||||
int(thing["x1"] * (run_scale / view_scale)),
|
|
||||||
int(thing["y1"] * (run_scale / view_scale)),
|
|
||||||
),
|
|
||||||
# End point
|
|
||||||
(
|
|
||||||
int(thing["x2"] * (run_scale / view_scale)),
|
|
||||||
int(thing["y2"] * (run_scale / view_scale)),
|
|
||||||
),
|
|
||||||
# Color
|
|
||||||
(0, 255, 0),
|
|
||||||
# Thickness
|
|
||||||
2,
|
|
||||||
)
|
|
||||||
cv2.putText(
|
|
||||||
# Image
|
|
||||||
view_frame,
|
|
||||||
# Text
|
|
||||||
thing["label"],
|
|
||||||
# Origin
|
|
||||||
(
|
|
||||||
int(thing["x1"] * (run_scale / view_scale)),
|
|
||||||
int(thing["y1"] * (run_scale / view_scale)),
|
|
||||||
),
|
|
||||||
# Font
|
|
||||||
font,
|
|
||||||
# Font Scale
|
|
||||||
1,
|
|
||||||
# Color
|
|
||||||
(0, 255, 0),
|
|
||||||
# Thickness
|
|
||||||
1,
|
|
||||||
)
|
|
||||||
return view_frame
|
|
|
@ -1,10 +1,8 @@
|
||||||
# import face_recognition
|
# import face_recognition
|
||||||
import cv2
|
import cv2
|
||||||
import numpy as np
|
|
||||||
import dotenv
|
import dotenv
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import os
|
import os
|
||||||
import time
|
|
||||||
|
|
||||||
# import hjson as json
|
# import hjson as json
|
||||||
import torch
|
import torch
|
||||||
|
@ -18,11 +16,14 @@ from .utils import utils
|
||||||
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
|
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||||||
args = None
|
args = None
|
||||||
|
|
||||||
object_names = {}
|
objects_and_peoples = {
|
||||||
|
"objects": {},
|
||||||
|
"peoples": {},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
global object_names
|
global objects_and_peoples
|
||||||
global args
|
global args
|
||||||
# RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") # Replace this with code to check for gpu
|
# RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") # Replace this with code to check for gpu
|
||||||
|
|
||||||
|
@ -45,7 +46,8 @@ def main():
|
||||||
# Set it to the env RUN_SCALE if it isn't blank, otherwise set it to 0.25
|
# Set it to the env RUN_SCALE if it isn't blank, otherwise set it to 0.25
|
||||||
default=os.environ["RUN_SCALE"]
|
default=os.environ["RUN_SCALE"]
|
||||||
if "RUN_SCALE" in os.environ and os.environ["RUN_SCALE"] != ""
|
if "RUN_SCALE" in os.environ and os.environ["RUN_SCALE"] != ""
|
||||||
else 0.25, # noqa: E501
|
# else 0.25,
|
||||||
|
else 1,
|
||||||
type=float,
|
type=float,
|
||||||
help="The scale to run the detection at, default is 0.25",
|
help="The scale to run the detection at, default is 0.25",
|
||||||
)
|
)
|
||||||
|
@ -54,7 +56,8 @@ def main():
|
||||||
# Set it to the env VIEW_SCALE if it isn't blank, otherwise set it to 0.75
|
# Set it to the env VIEW_SCALE if it isn't blank, otherwise set it to 0.75
|
||||||
default=os.environ["VIEW_SCALE"]
|
default=os.environ["VIEW_SCALE"]
|
||||||
if "VIEW_SCALE" in os.environ and os.environ["VIEW_SCALE"] != ""
|
if "VIEW_SCALE" in os.environ and os.environ["VIEW_SCALE"] != ""
|
||||||
else 0.75, # noqa: E501
|
# else 0.75,
|
||||||
|
else 1,
|
||||||
type=float,
|
type=float,
|
||||||
help="The scale to view the detection at, default is 0.75",
|
help="The scale to view the detection at, default is 0.75",
|
||||||
)
|
)
|
||||||
|
@ -77,6 +80,15 @@ def main():
|
||||||
help="The object(s) to detect. Must be something the model is trained to detect",
|
help="The object(s) to detect. Must be something the model is trained to detect",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
argparser.add_argument(
|
||||||
|
"--faces-directory",
|
||||||
|
default=os.environ["FACES_DIRECTORY"]
|
||||||
|
if "FACES_DIRECTORY" in os.environ and os.environ["FACES_DIRECTORY"] != ""
|
||||||
|
else "faces",
|
||||||
|
type=str,
|
||||||
|
help="The directory to store the faces. Should contain 1 subdirectory of images per person",
|
||||||
|
)
|
||||||
|
|
||||||
stream_source = argparser.add_mutually_exclusive_group()
|
stream_source = argparser.add_mutually_exclusive_group()
|
||||||
stream_source.add_argument(
|
stream_source.add_argument(
|
||||||
"--url",
|
"--url",
|
||||||
|
@ -95,6 +107,10 @@ def main():
|
||||||
help="The capture device to use. Can also be a url.",
|
help="The capture device to use. Can also be a url.",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Defaults for the stuff here and down are already set in notify.py.
|
||||||
|
# Setting them here just means that argparse will display the default values as defualt
|
||||||
|
# TODO: Perhaps just remove the default parameter and just add to the help message that the default is set is x
|
||||||
|
|
||||||
notifcation_services = argparser.add_argument_group("Notification Services")
|
notifcation_services = argparser.add_argument_group("Notification Services")
|
||||||
notifcation_services.add_argument(
|
notifcation_services.add_argument(
|
||||||
"--ntfy-url",
|
"--ntfy-url",
|
||||||
|
@ -177,19 +193,39 @@ def main():
|
||||||
for i, r in enumerate(results):
|
for i, r in enumerate(results):
|
||||||
# list of dicts with each dict containing a label, x1, y1, x2, y2
|
# list of dicts with each dict containing a label, x1, y1, x2, y2
|
||||||
plot_boxes = []
|
plot_boxes = []
|
||||||
|
|
||||||
|
# The following is stuff for people
|
||||||
|
# This is still in the for loop as each result, no matter if anything is detected, will be present.
|
||||||
|
# Thus, there will always be one result (r)
|
||||||
|
if face_details := utils.recognize_face(path_to_directory=Path(args.faces_directory), run_frame=run_frame):
|
||||||
|
plot_boxes.append( face_details )
|
||||||
|
objects_and_peoples=notify.thing_detected(
|
||||||
|
thing_name=face_details["label"],
|
||||||
|
objects_and_peoples=objects_and_peoples,
|
||||||
|
detection_type="peoples",
|
||||||
|
detection_window=args.detection_window,
|
||||||
|
detection_duration=args.detection_duration,
|
||||||
|
notification_window=args.notification_window,
|
||||||
|
ntfy_url=args.ntfy_url,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# The following is stuff for objects
|
||||||
# Setup dictionary of object names
|
# Setup dictionary of object names
|
||||||
if not object_names:
|
if objects_and_peoples["objects"] == {} or objects_and_peoples["objects"] is None:
|
||||||
for name in r.names.values():
|
for name in r.names.values():
|
||||||
object_names[name] = {
|
objects_and_peoples["objects"][name] = {
|
||||||
"last_detection_time": None,
|
"last_detection_time": None,
|
||||||
"detection_duration": None,
|
"detection_duration": None,
|
||||||
# "first_detection_time": None,
|
# "first_detection_time": None,
|
||||||
"last_notification_time": None,
|
"last_notification_time": None,
|
||||||
}
|
}
|
||||||
# Also, make sure that the objects to detect are in the list of object_names
|
# Also, make sure that the objects to detect are in the list of objects_and_peoples
|
||||||
# If it isn't, print a warning
|
# If it isn't, print a warning
|
||||||
for obj in args.detect_object:
|
for obj in args.detect_object:
|
||||||
if obj not in object_names:
|
if obj not in objects_and_peoples:
|
||||||
print(
|
print(
|
||||||
f"Warning: {obj} is not in the list of objects the model can detect!"
|
f"Warning: {obj} is not in the list of objects the model can detect!"
|
||||||
)
|
)
|
||||||
|
@ -228,79 +264,18 @@ def main():
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
# End goal: Send a notification when an object has been detected for 2 seconds in the past 15 seconds.
|
objects_and_peoples=notify.thing_detected(
|
||||||
# However, don't send a notification if the last notification was less than 15 seconds ago
|
thing_name=class_id,
|
||||||
|
objects_and_peoples=objects_and_peoples,
|
||||||
|
detection_type="objects",
|
||||||
|
detection_window=args.detection_window,
|
||||||
|
detection_duration=args.detection_duration,
|
||||||
|
notification_window=args.notification_window,
|
||||||
|
ntfy_url=args.ntfy_url,
|
||||||
|
)
|
||||||
|
|
||||||
# (re)start cycle
|
# TODO: On 10-14-2023, while testing, it seemed the bounding box was too low. Troubleshoot if it's a plotting problem.
|
||||||
if (
|
# To do so, use r.plot() to cross reference the bounding box drawn by the plot_label function and r.plot()
|
||||||
# If the object has not been detected before
|
|
||||||
object_names[class_id]["last_detection_time"] is None
|
|
||||||
# If the last detection was more than 15 seconds ago
|
|
||||||
or time.time() - object_names[class_id]["last_detection_time"]
|
|
||||||
> args.detection_window
|
|
||||||
):
|
|
||||||
# Set the last detection time to now
|
|
||||||
object_names[class_id]["last_detection_time"] = time.time()
|
|
||||||
print(f"First detection of {class_id} in this detection window")
|
|
||||||
# This line is important. It resets the detection duration when the object hasn't been detected for a while
|
|
||||||
# If detection duration is None, don't print anything.
|
|
||||||
# Otherwise, print that the detection duration is being reset due to inactivity
|
|
||||||
if object_names[class_id]["detection_duration"] is not None:
|
|
||||||
print(
|
|
||||||
f"Resetting detection duration for {class_id} since it hasn't been detected for {args.detection_window} seconds" # noqa: E501
|
|
||||||
)
|
|
||||||
object_names[class_id]["detection_duration"] = 0
|
|
||||||
else:
|
|
||||||
# Check if the last notification was less than 15 seconds ago
|
|
||||||
# If it was, then don't do anything
|
|
||||||
if (
|
|
||||||
time.time() - object_names[class_id]["last_detection_time"]
|
|
||||||
<= args.notification_window
|
|
||||||
):
|
|
||||||
pass
|
|
||||||
# If it was more than 15 seconds ago, reset the detection duration
|
|
||||||
# This effectively resets the notification timer
|
|
||||||
else:
|
|
||||||
print("Notification timer has expired - resetting")
|
|
||||||
object_names[class_id]["detection_duration"] = 0
|
|
||||||
object_names[class_id]["detection_duration"] += (
|
|
||||||
time.time() - object_names[class_id]["last_detection_time"]
|
|
||||||
)
|
|
||||||
# print("Updating detection duration")
|
|
||||||
object_names[class_id]["last_detection_time"] = time.time()
|
|
||||||
|
|
||||||
# (re)send notification
|
|
||||||
# Check if detection has been ongoing for 2 seconds or more in the past 15 seconds
|
|
||||||
if (
|
|
||||||
object_names[class_id]["detection_duration"]
|
|
||||||
>= args.detection_duration
|
|
||||||
and time.time() - object_names[class_id]["last_detection_time"]
|
|
||||||
<= args.detection_window
|
|
||||||
):
|
|
||||||
# If the last notification was more than 15 seconds ago, then send a notification
|
|
||||||
if (
|
|
||||||
object_names[class_id]["last_notification_time"] is None
|
|
||||||
or time.time()
|
|
||||||
- object_names[class_id]["last_notification_time"]
|
|
||||||
> args.notification_window
|
|
||||||
):
|
|
||||||
object_names[class_id]["last_notification_time"] = time.time()
|
|
||||||
print(
|
|
||||||
f"Detected {class_id} for {args.detection_duration} seconds"
|
|
||||||
)
|
|
||||||
headers = notify.construct_ntfy_headers(
|
|
||||||
title=f"{class_id} detected",
|
|
||||||
tag="rotating_light",
|
|
||||||
priority="default",
|
|
||||||
)
|
|
||||||
notify.send_notification(
|
|
||||||
data=f"{class_id} detected for {args.detection_duration} seconds",
|
|
||||||
headers=headers,
|
|
||||||
url=args.ntfy_url,
|
|
||||||
)
|
|
||||||
# Reset the detection duration
|
|
||||||
print("Just sent a notification - resetting detection duration")
|
|
||||||
object_names[class_id]["detection_duration"] = 0
|
|
||||||
frame_to_show = utils.plot_label(
|
frame_to_show = utils.plot_label(
|
||||||
boxes=plot_boxes,
|
boxes=plot_boxes,
|
||||||
full_frame=frame,
|
full_frame=frame,
|
||||||
|
@ -322,5 +297,5 @@ def main():
|
||||||
video_capture.release()
|
video_capture.release()
|
||||||
cv2.destroyAllWindows()
|
cv2.destroyAllWindows()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
|
@ -0,0 +1,143 @@
|
||||||
|
import httpx
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
Structure of objects_and_peoples
|
||||||
|
Really, the only reason peoples is a separate dictionary is to prevent duplicates, though it just makes the code more complicated.
|
||||||
|
TODO: Make a function to check if a person is in the objects dictionary and vice versa
|
||||||
|
{
|
||||||
|
"objects": {
|
||||||
|
"object_name": {
|
||||||
|
"last_detection_time": float,
|
||||||
|
"detection_duration": float,
|
||||||
|
"last_notification_time": float,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"peoples": {
|
||||||
|
"person_name": {
|
||||||
|
"last_detection_time": float,
|
||||||
|
"detection_duration": float,
|
||||||
|
"last_notification_time": float,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
'''
|
||||||
|
# objects_and_peoples = {}
|
||||||
|
|
||||||
|
|
||||||
|
def thing_detected(
|
||||||
|
thing_name: str,
|
||||||
|
objects_and_peoples: dict,
|
||||||
|
detection_type: str = "objects",
|
||||||
|
detection_window: int = 15,
|
||||||
|
detection_duration: int = 2,
|
||||||
|
notification_window: int = 15,
|
||||||
|
ntfy_url: str = "https://ntfy.sh/set-detect-notify"
|
||||||
|
) -> dict:
|
||||||
|
'''
|
||||||
|
A function to make sure 2 seconds of detection is detected in 15 seconds, 15 seconds apart.
|
||||||
|
Takes a dict that will be retured with the updated detection times. MAKE SURE TO SAVE THE RETURNED DICTIONARY
|
||||||
|
'''
|
||||||
|
|
||||||
|
# "Alias" the objects and peoples dictionaries so it's easier to work with
|
||||||
|
respective_type = objects_and_peoples[detection_type]
|
||||||
|
|
||||||
|
# (re)start cycle
|
||||||
|
try:
|
||||||
|
if (
|
||||||
|
# If the object has not been detected before
|
||||||
|
respective_type[thing_name]["last_detection_time"] is None
|
||||||
|
# If the last detection was more than 15 seconds ago
|
||||||
|
or time.time() - respective_type[thing_name]["last_detection_time"]
|
||||||
|
> detection_window
|
||||||
|
):
|
||||||
|
# Set the last detection time to now
|
||||||
|
respective_type[thing_name]["last_detection_time"] = time.time()
|
||||||
|
print(f"First detection of {thing_name} in this detection window")
|
||||||
|
# This line is important. It resets the detection duration when the object hasn't been detected for a while
|
||||||
|
# If detection duration is None, don't print anything.
|
||||||
|
# Otherwise, print that the detection duration is being reset due to inactivity
|
||||||
|
if respective_type[thing_name]["detection_duration"] is not None:
|
||||||
|
print(
|
||||||
|
f"Resetting detection duration for {thing_name} since it hasn't been detected for {detection_window} seconds" # noqa: E501
|
||||||
|
)
|
||||||
|
respective_type[thing_name]["detection_duration"] = 0
|
||||||
|
else:
|
||||||
|
# Check if the last NOTIFICATION was less than 15 seconds ago
|
||||||
|
# If it was, then don't do anything
|
||||||
|
if (
|
||||||
|
time.time() - respective_type[thing_name]["last_detection_time"]
|
||||||
|
<= notification_window
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
# If it was more than 15 seconds ago, reset the detection duration
|
||||||
|
# This effectively resets the notification timer
|
||||||
|
else:
|
||||||
|
print("Notification timer has expired - resetting")
|
||||||
|
respective_type[thing_name]["detection_duration"] = 0
|
||||||
|
respective_type[thing_name]["detection_duration"] += (
|
||||||
|
time.time() - respective_type[thing_name]["last_detection_time"]
|
||||||
|
)
|
||||||
|
# print("Updating detection duration")
|
||||||
|
respective_type[thing_name]["last_detection_time"] = time.time()
|
||||||
|
except KeyError:
|
||||||
|
# If the object has not been detected before
|
||||||
|
respective_type[thing_name] = {
|
||||||
|
"last_detection_time": time.time(),
|
||||||
|
"detection_duration": 0,
|
||||||
|
"last_notification_time": None,
|
||||||
|
}
|
||||||
|
print(f"First detection of {thing_name} ever")
|
||||||
|
|
||||||
|
# (re)send notification
|
||||||
|
# Check if detection has been ongoing for 2 seconds or more in the past 15 seconds
|
||||||
|
if (
|
||||||
|
respective_type[thing_name]["detection_duration"]
|
||||||
|
>= detection_duration
|
||||||
|
and time.time() - respective_type[thing_name]["last_detection_time"]
|
||||||
|
<= detection_window
|
||||||
|
):
|
||||||
|
# If the last notification was more than 15 seconds ago, then send a notification
|
||||||
|
if (
|
||||||
|
respective_type[thing_name]["last_notification_time"] is None
|
||||||
|
or time.time()
|
||||||
|
- respective_type[thing_name]["last_notification_time"]
|
||||||
|
> notification_window
|
||||||
|
):
|
||||||
|
respective_type[thing_name]["last_notification_time"] = time.time()
|
||||||
|
print(
|
||||||
|
f"Detected {thing_name} for {detection_duration} seconds"
|
||||||
|
)
|
||||||
|
headers = construct_ntfy_headers(
|
||||||
|
title=f"{thing_name} detected",
|
||||||
|
tag="rotating_light",
|
||||||
|
priority="default",
|
||||||
|
)
|
||||||
|
send_notification(
|
||||||
|
data=f"{thing_name} detected for {detection_duration} seconds",
|
||||||
|
headers=headers,
|
||||||
|
url=ntfy_url,
|
||||||
|
)
|
||||||
|
# Reset the detection duration
|
||||||
|
print("Just sent a notification - resetting detection duration")
|
||||||
|
respective_type[thing_name]["detection_duration"] = 0
|
||||||
|
|
||||||
|
# Take the aliased objects_and_peoples and update the respective dictionary
|
||||||
|
objects_and_peoples[detection_type] = respective_type
|
||||||
|
return objects_and_peoples
|
||||||
|
|
||||||
|
|
||||||
|
def construct_ntfy_headers(
|
||||||
|
title: str = "Object/Person Detected",
|
||||||
|
tag="rotating_light", # https://docs.ntfy.sh/publish/#tags-emojis
|
||||||
|
priority="default", # https://docs.ntfy.sh/publish/#message-priority
|
||||||
|
) -> dict:
|
||||||
|
return {"Title": title, "Priority": priority, "Tags": tag}
|
||||||
|
|
||||||
|
|
||||||
|
def send_notification(data: str, headers: dict, url: str):
|
||||||
|
if url is None or data is None:
|
||||||
|
raise ValueError("url and data cannot be None")
|
||||||
|
httpx.post(url, data=data.encode("utf-8"), headers=headers)
|
||||||
|
|
|
@ -0,0 +1,133 @@
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
from pathlib import Path
|
||||||
|
from deepface import DeepFace
|
||||||
|
|
||||||
|
first_face_try = True
|
||||||
|
|
||||||
|
def plot_label(
|
||||||
|
# list of dicts with each dict containing a label, x1, y1, x2, y2
|
||||||
|
boxes: list = None,
|
||||||
|
# opencv image
|
||||||
|
full_frame: np.ndarray = None,
|
||||||
|
# run_scale is the scale of the image that was used to run the model
|
||||||
|
# So the coordinates will be scaled up to the view frame size
|
||||||
|
run_scale: float = None,
|
||||||
|
# view_scale is the scale of the image, in relation to the full frame
|
||||||
|
# So the coordinates will be scaled appropriately when coming from run_frame
|
||||||
|
view_scale: float = None,
|
||||||
|
font: int = cv2.FONT_HERSHEY_SIMPLEX,
|
||||||
|
):
|
||||||
|
view_frame = cv2.resize(full_frame, (0, 0), fx=view_scale, fy=view_scale)
|
||||||
|
for thing in boxes:
|
||||||
|
cv2.rectangle(
|
||||||
|
# Image
|
||||||
|
view_frame,
|
||||||
|
# Start point
|
||||||
|
(
|
||||||
|
int(thing["x1"] * (run_scale / view_scale)),
|
||||||
|
int(thing["y1"] * (run_scale / view_scale)),
|
||||||
|
),
|
||||||
|
# End point
|
||||||
|
(
|
||||||
|
int(thing["x2"] * (run_scale / view_scale)),
|
||||||
|
int(thing["y2"] * (run_scale / view_scale)),
|
||||||
|
),
|
||||||
|
# Color
|
||||||
|
(0, 255, 0),
|
||||||
|
# Thickness
|
||||||
|
2,
|
||||||
|
)
|
||||||
|
cv2.putText(
|
||||||
|
# Image
|
||||||
|
view_frame,
|
||||||
|
# Text
|
||||||
|
thing["label"],
|
||||||
|
# Origin
|
||||||
|
(
|
||||||
|
int(thing["x1"] * (run_scale / view_scale)),
|
||||||
|
int(thing["y1"] * (run_scale / view_scale)),
|
||||||
|
),
|
||||||
|
# Font
|
||||||
|
font,
|
||||||
|
# Font Scale
|
||||||
|
1,
|
||||||
|
# Color
|
||||||
|
(0, 255, 0),
|
||||||
|
# Thickness
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
return view_frame
|
||||||
|
|
||||||
|
|
||||||
|
def recognize_face(
|
||||||
|
path_to_directory: Path = Path("faces"),
|
||||||
|
# opencv image
|
||||||
|
run_frame: np.ndarray = None,
|
||||||
|
) -> np.ndarray:
|
||||||
|
'''
|
||||||
|
Accepts a path to a directory of images of faces to be used as a refference
|
||||||
|
In addition, accepts an opencv image to be used as the frame to be searched
|
||||||
|
|
||||||
|
Returns a single dictonary as currently only 1 face can be detected in each frame
|
||||||
|
dict contains the following keys: label, x1, y1, x2, y2
|
||||||
|
The directory should be structured as follows:
|
||||||
|
faces/
|
||||||
|
name/
|
||||||
|
image1.jpg
|
||||||
|
image2.jpg
|
||||||
|
image3.jpg
|
||||||
|
name2/
|
||||||
|
image1.jpg
|
||||||
|
image2.jpg
|
||||||
|
image3.jpg
|
||||||
|
(not neccessarily jpgs, but you get the idea)
|
||||||
|
|
||||||
|
Point is, `name` is the name of the person in the images in the directory `name`
|
||||||
|
That name will be used as the label for the face in the frame
|
||||||
|
'''
|
||||||
|
global first_face_try
|
||||||
|
|
||||||
|
# If it's the first time the function is being run, remove representations_vgg_face.pkl, if it exists
|
||||||
|
if first_face_try:
|
||||||
|
try:
|
||||||
|
Path("representations_vgg_face.pkl").unlink()
|
||||||
|
print("Removing representations_vgg_face.pkl")
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
first_face_try = False
|
||||||
|
|
||||||
|
# face_dataframes is a vanilla list of dataframes
|
||||||
|
try:
|
||||||
|
face_dataframes = DeepFace.find(run_frame, db_path=str(path_to_directory), enforce_detection=True, silent=True)
|
||||||
|
except ValueError as e:
|
||||||
|
if str(e) == "Face could not be detected. Please confirm that the picture is a face photo or consider to set enforce_detection param to False.":
|
||||||
|
return None
|
||||||
|
# Iteate over the dataframes
|
||||||
|
for df in face_dataframes:
|
||||||
|
# The last row is the highest confidence
|
||||||
|
# So we can just grab the path from there
|
||||||
|
# iloc = Integer LOCation
|
||||||
|
path_to_image = Path(df.iloc[-1]["identity"])
|
||||||
|
# Get the name of the parent directory
|
||||||
|
label = path_to_image.parent.name
|
||||||
|
# Return the coordinates of the box in xyxy format, rather than xywh
|
||||||
|
# This is because YOLO uses xyxy, and that's how plot_label expects
|
||||||
|
# Also, xyxy is just the top left and bottom right corners of the box
|
||||||
|
coordinates = {
|
||||||
|
"x1": df.iloc[-1]["source_x"],
|
||||||
|
"y1": df.iloc[-1]["source_y"],
|
||||||
|
"x2": df.iloc[-1]["source_x"] + df.iloc[-1]["source_w"],
|
||||||
|
"y2": df.iloc[-1]["source_y"] + df.iloc[-1]["source_h"],
|
||||||
|
}
|
||||||
|
distance = df.iloc[-1]["VGG-Face_cosine"]
|
||||||
|
# if 0.5 < distance < 0.7:
|
||||||
|
# label = "Unknown"
|
||||||
|
to_return = dict(label=label, **coordinates)
|
||||||
|
print(f'Confindence: {distance}, filname: {path_to_image.name}, to_return: {to_return}')
|
||||||
|
return to_return
|
||||||
|
|
||||||
|
'''
|
||||||
|
Example dataframe, for reference
|
||||||
|
identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \_('_')_/)
|
||||||
|
'''
|
Loading…
Reference in New Issue