Compare commits

..

No commits in common. "d56cee67515a27438552a537b6da8763543500ac" and "a9ab9db8929da492db4026543ba5e025ae995a0c" have entirely different histories.

9 changed files with 1034 additions and 1130 deletions

View File

@ -5,8 +5,6 @@ name = "python"
[analyzers.meta] [analyzers.meta]
runtime_version = "3.x.x" runtime_version = "3.x.x"
max_line_length = 135
[[analyzers]] [[analyzers]]
name = "docker" name = "docker"

View File

@ -1 +1 @@
3.10.12 3.10.5

12
.vscode/launch.json vendored
View File

@ -10,20 +10,10 @@
"request": "launch", "request": "launch",
"module": "wyzely_detect", "module": "wyzely_detect",
"args": [ "args": [
"--run-scale", "0.25", "--view-scale", "0.5", "--no-remove-representations", "--fake-second-source" "--run-scale", "0.25", "--view-scale", "0.5", "--no-remove-representations"
], ],
"justMyCode": true "justMyCode": true
}, },
// {
// "name": "Quick, Specific Debug",
// "type": "python",
// "request": "launch",
// "module": "wyzely_detect",
// "args": [
// "--run-scale", "0.25", "--view-scale", "0.5", "--no-remove-representations", "--detect-object", "person", "--detect-object", "cell phone"
// ],
// "justMyCode": true
// },
{ {
// "name": "Python: Module", // "name": "Python: Module",
"name": "Full Debug", "name": "Full Debug",

View File

@ -16,9 +16,6 @@ Recognize faces/objects in a video stream (from a webcam or a security camera) a
- All RTSP feeds _should_ work, however. - All RTSP feeds _should_ work, however.
- Python 3.10 or 3.11 - Python 3.10 or 3.11
- Poetry (optional) - Poetry (optional)
- Windows or Linux
- I've tested this on MacOS - it works on my 2014 MacBook Air but not a 2011 MacBook Pro
- Both were upgraded with OpenCore, with the MacBook Air running Monterey and the MacBook Pro running a newer version of MacOS, which may have been the problem
### Docker ### Docker
- A Wyze Cam - A Wyze Cam
@ -49,7 +46,6 @@ This assumes you have Python 3.10 or 3.11 installed
#### Poetry #### Poetry
1. `poetry install` 1. `poetry install`
a. For GPU support, use `poetry install -E cuda --with gpu`
2. `poetry run -- wyzely-detect` 2. `poetry run -- wyzely-detect`
### Configuration ### Configuration
The following are some basic CLI options. Most flags have environment variable equivalents which can be helpful when using Docker. The following are some basic CLI options. Most flags have environment variable equivalents which can be helpful when using Docker.

1638
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -21,12 +21,11 @@ ultralytics = "^8.0.190"
hjson = "^3.1.0" hjson = "^3.1.0"
numpy = "^1.23.2" numpy = "^1.23.2"
# https://github.com/python-poetry/poetry/issues/6409#issuecomment-1911735833 # https://github.com/python-poetry/poetry/issues/6409
# To install with GPU, use poetry install -E cuda --with gpu torch = ">=2.0.0, !=2.0.1, !=2.1.0"
torch = {version = "2.1.*", source = "pytorch-cpu", markers = "extra!='cuda'" }
# https://stackoverflow.com/a/76477590/18270659 # https://stackoverflow.com/a/76477590/18270659
# https://discfuss.tensorflow.org/t/tensorflow-io-gcs-filesystem-with-windows/18849/4 # https://discuss.tensorflow.org/t/tensorflow-io-gcs-filesystem-with-windows/18849/4
# Might be able to remove this version constraint later # Might be able to remove this version constraint later
# Working versions: # Working versions:
# Python version 3.10.12 and 3.10.5 both work # Python version 3.10.12 and 3.10.5 both work
@ -34,33 +33,10 @@ torch = {version = "2.1.*", source = "pytorch-cpu", markers = "extra!='cuda'" }
# cuDNN version - 8.8.1 # cuDNN version - 8.8.1
# Installed from Nvidia website - nvidia-cuda-toolkit is not installed, but default PopOS drivers are installed # Installed from Nvidia website - nvidia-cuda-toolkit is not installed, but default PopOS drivers are installed
tensorflow-io-gcs-filesystem = "0.31.0" tensorflow-io-gcs-filesystem = "0.31.0"
tensorflow = {version = "^2.14.0", markers = "extra!='cuda'"} tensorflow = {version = "^2.14.0", extras = ["and-cuda"]}
deepface = "^0.0.79" deepface = "^0.0.79"
prettytable = "^3.9.0"
[tool.poetry.group.gpu]
optional = true
[tool.poetry.group.gpu.dependencies]
torch = {version = "2.1.*", source = "pytorch-cu121", markers = "extra=='cuda'"}
tensorflow = {version = "^2.14.0", extras = ["and-cuda"], markers = "extra=='cuda'"}
[tool.poetry.extras]
# Might be better to rename this to nocpu since it's more accurate
cuda = []
[[tool.poetry.source]]
name = "pytorch-cpu"
url = "https://download.pytorch.org/whl/cpu"
priority = "explicit"
[[tool.poetry.source]]
name = "pytorch-cu121"
url = "https://download.pytorch.org/whl/cu121"
priority = "explicit"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
black = "^23.9.1" black = "^23.9.1"

View File

@ -1,22 +1,28 @@
# import face_recognition # import face_recognition
from pathlib import Path from pathlib import Path
import os
import cv2 import cv2
import sys
from prettytable import PrettyTable
# import hjson as json # import hjson as json
import torch import torch
from ultralytics import YOLO from ultralytics import YOLO
from .utils import utils from .utils import notify, utils
from .utils.cli_args import argparser from .utils.cli_args import argparser
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
args = None args = None
objects_and_peoples = {
"objects": {},
"peoples": {},
}
def main(): def main():
global objects_and_peoples
global args global args
# RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") # Replace this with code to check for gpu
args = argparser.parse_args() args = argparser.parse_args()
@ -24,7 +30,7 @@ def main():
# https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168 # https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168
# Currently, I have been unable to set up Poetry to use GPU for Torch # Currently, I have been unable to set up Poetry to use GPU for Torch
for i in range(torch.cuda.device_count()): for i in range(torch.cuda.device_count()):
print(f"Using {torch.cuda.get_device_properties(i).name} for pytorch") print(f'Using {torch.cuda.get_device_properties(i).name} for pytorch')
if torch.cuda.is_available(): if torch.cuda.is_available():
torch.cuda.set_device(0) torch.cuda.set_device(0)
print("Set CUDA device") print("Set CUDA device")
@ -35,10 +41,9 @@ def main():
if args.force_disable_tensorflow_gpu: if args.force_disable_tensorflow_gpu:
print("Forcing tensorflow to use CPU") print("Forcing tensorflow to use CPU")
import tensorflow as tf import tensorflow as tf
tf.config.set_visible_devices([], 'GPU')
tf.config.set_visible_devices([], "GPU") if tf.config.experimental.list_logical_devices('GPU'):
if tf.config.experimental.list_logical_devices("GPU"): print('GPU disabled unsuccessfully')
print("GPU disabled unsuccessfully")
else: else:
print("GPU disabled successfully") print("GPU disabled successfully")
@ -46,89 +51,140 @@ def main():
# Depending on if the user wants to use a stream or a capture device, # Depending on if the user wants to use a stream or a capture device,
# Set the video capture to the appropriate source # Set the video capture to the appropriate source
if not args.rtsp_url and not args.capture_device: if args.rtsp_url is not None:
print("No stream or capture device set, defaulting to capture device 0") video_capture = cv2.VideoCapture(args.rtsp_url)
video_sources = {"devices": [cv2.VideoCapture(0)]}
else: else:
video_sources = { video_capture = cv2.VideoCapture(args.capture_device)
"streams": [cv2.VideoCapture(url) for url in args.rtsp_url],
"devices": [cv2.VideoCapture(device) for device in args.capture_device],
}
if args.fake_second_source:
try:
video_sources["devices"].append(video_sources["devices"][0])
except KeyError:
print("No capture device to use as second source. Trying stream.")
try:
video_sources["devices"].append(video_sources["devices"][0])
except KeyError:
print("No stream to use as a second source")
# When the code tries to resize the nonexistent capture device 1, the program will fail
# Eliminate lag by setting the buffer size to 1 # Eliminate lag by setting the buffer size to 1
# This makes it so that the video capture will only grab the most recent frame # This makes it so that the video capture will only grab the most recent frame
# However, this means that the video may be choppy # However, this means that the video may be choppy
# Only do this for streams video_capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)
try:
for stream in video_sources["streams"]:
stream.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# If there are no streams, this will throw a KeyError
except KeyError:
pass
# Print out the resolution of the video sources. Ideally, change this so the device ID/url is also printed # Print the resolution of the video
pretty_table = PrettyTable(field_names=["Source Type", "Resolution"]) print(
for source_type, sources in video_sources.items(): f"Video resolution: {video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)}x{video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)}" # noqa: E501
for source in sources:
if (
source.get(cv2.CAP_PROP_FRAME_WIDTH) == 0
or source.get(cv2.CAP_PROP_FRAME_HEIGHT) == 0
):
message = "Capture for a source failed as resolution is 0x0.\n"
if source_type == "streams":
message += "Check if the stream URL is correct and if the stream is online."
else:
message += "Check if the capture device is connected, working, and not in use by another program."
print(message)
sys.exit(1)
pretty_table.add_row(
[
source_type,
f"{source.get(cv2.CAP_PROP_FRAME_WIDTH)}x{source.get(cv2.CAP_PROP_FRAME_HEIGHT)}",
]
) )
print(pretty_table)
print("Beginning video capture...") print("Beginning video capture...")
while True: while True:
# Grab a single frame of video # Grab a single frame of video
frames = [] ret, frame = video_capture.read()
# frames = [source.read() for sources in video_sources.values() for source in sources] # Resize frame of video to a smaller size for faster recognition processing
for list_of_sources in video_sources.values(): run_frame = cv2.resize(frame, (0, 0), fx=args.run_scale, fy=args.run_scale)
frames.extend([source.read()[1] for source in list_of_sources]) # view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale)
frames_to_show = []
for frame in frames: results = model(run_frame, verbose=False)
frames_to_show.append(
utils.process_footage( path_to_faces = Path(args.faces_directory)
frame=frame, path_to_faces_exists = path_to_faces.is_dir()
run_scale=args.run_scale,
view_scale=args.view_scale, for i, r in enumerate(results):
faces_directory=Path(args.faces_directory), # list of dicts with each dict containing a label, x1, y1, x2, y2
face_confidence_threshold=args.face_confidence_threshold, plot_boxes = []
# The following is stuff for people
# This is still in the for loop as each result, no matter if anything is detected, will be present.
# Thus, there will always be one result (r)
# Only run if path_to_faces exists
# May be better to check every iteration, but this also works
if path_to_faces_exists:
if face_details := utils.recognize_face(
path_to_directory=path_to_faces,
run_frame=run_frame,
min_confidence=args.face_confidence_threshold,
no_remove_representations=args.no_remove_representations, no_remove_representations=args.no_remove_representations,
):
plot_boxes.append(face_details)
objects_and_peoples = notify.thing_detected(
thing_name=face_details["label"],
objects_and_peoples=objects_and_peoples,
detection_type="peoples",
detection_window=args.detection_window, detection_window=args.detection_window,
detection_duration=args.detection_duration, detection_duration=args.detection_duration,
notification_window=args.notification_window, notification_window=args.notification_window,
ntfy_url=args.ntfy_url, ntfy_url=args.ntfy_url,
model=model,
detect_object=args.detect_object,
object_confidence_threshold=args.object_confidence_threshold,
) )
# The following is stuff for objects
# Setup dictionary of object names
if (
objects_and_peoples["objects"] == {}
or objects_and_peoples["objects"] is None
):
for name in r.names.values():
objects_and_peoples["objects"][name] = {
"last_detection_time": None,
"detection_duration": None,
# "first_detection_time": None,
"last_notification_time": None,
}
# Also, make sure that the objects to detect are in the list of objects_and_peoples
# If it isn't, print a warning
for obj in args.detect_object:
if obj not in objects_and_peoples:
print(
f"Warning: {obj} is not in the list of objects the model can detect!"
) )
for box in r.boxes:
# Get the name of the object
class_id = r.names[box.cls[0].item()]
# Get the coordinates of the object
cords = box.xyxy[0].tolist()
cords = [round(x) for x in cords]
# Get the confidence
conf = round(box.conf[0].item(), 2)
# Print it out, adding a spacer between each object
# print("Object type:", class_id)
# print("Coordinates:", cords)
# print("Probability:", conf)
# print("---")
# Now do stuff (if conf > 0.5)
if conf < args.object_confidence_threshold or (
class_id not in args.detect_object and args.detect_object != []
):
# If the confidence is too low
# or if the object is not in the list of objects to detect and the list of objects to detect is not empty
# then skip this iteration
continue
# Add the object to the list of objects to plot
plot_boxes.append(
{
"label": class_id,
"x1": cords[0],
"y1": cords[1],
"x2": cords[2],
"y2": cords[3],
}
)
objects_and_peoples = notify.thing_detected(
thing_name=class_id,
objects_and_peoples=objects_and_peoples,
detection_type="objects",
detection_window=args.detection_window,
detection_duration=args.detection_duration,
notification_window=args.notification_window,
ntfy_url=args.ntfy_url,
)
# To debug plotting, use r.plot() to cross reference the bounding boxes drawn by the plot_label() and r.plot()
frame_to_show = utils.plot_label(
boxes=plot_boxes,
full_frame=frame,
# full_frame=r.plot(),
run_scale=args.run_scale,
view_scale=args.view_scale,
)
# Display the resulting frame # Display the resulting frame
# cv2.imshow("", r)
if not args.no_display: if not args.no_display:
for i, frame_to_show in enumerate(frames_to_show): cv2.imshow(f"Video{i}", frame_to_show)
cv2.imshow(f"Video {i}", frame_to_show)
# Hit 'q' on the keyboard to quit! # Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord("q"): if cv2.waitKey(1) & 0xFF == ord("q"):
@ -136,7 +192,7 @@ def main():
# Release handle to the webcam # Release handle to the webcam
print("Releasing video capture") print("Releasing video capture")
[source.release() for sources in video_sources.values() for source in sources] video_capture.release()
cv2.destroyAllWindows() cv2.destroyAllWindows()

View File

@ -15,35 +15,31 @@ def set_argparse():
else: else:
print("No .env file found") print("No .env file found")
# One important thing to consider is that most function parameters are optional and have a default value # One important thing to consider is that most function parameters are optional and have a default value
# However, with argparse, those are never used since a argparse always passes something, even if it's None # However, with argparse, those are never used since a argparse always passes something, even if it's None
argparser = argparse.ArgumentParser( argparser = argparse.ArgumentParser(
prog="Wyzely Detect", prog="Wyzely Detect",
description="Recognize faces/objects in a video stream (from a webcam or a security camera) and send notifications to your devices", # noqa: E501 description="Recognize faces/objects in a video stream (from a webcam or a security camera) and send notifications to your devices", # noqa: E501
epilog="For env bool options, setting them to anything except for an empty string will enable them.", epilog=":)",
) )
video_options = argparser.add_argument_group("Video Options") video_options = argparser.add_argument_group("Video Options")
stream_source = video_options.add_mutually_exclusive_group() stream_source = video_options.add_mutually_exclusive_group()
stream_source.add_argument( stream_source.add_argument(
"--rtsp-url", "--rtsp-url",
action="append", default=os.environ["RTSP_URL"]
# If RTSP_URL is in the environment, use it, otherwise just use a blank list
# This may cause problems down the road, but if it does, env for this can be removed
default=[os.environ["RTSP_URL"]]
if "RTSP_URL" in os.environ and os.environ["RTSP_URL"] != "" if "RTSP_URL" in os.environ and os.environ["RTSP_URL"] != ""
else [], else None, # noqa: E501
type=str, type=str,
help="RTSP camera URL", help="RTSP camera URL",
) )
stream_source.add_argument( stream_source.add_argument(
"--capture-device", "--capture-device",
action="append", default=os.environ["CAPTURE_DEVICE"]
# If CAPTURE_DEVICE is in the environment, use it, otherwise just use a blank list
# If __main__.py detects that no capture device or remote stream is set, it will default to 0
default=[int(os.environ["CAPTURE_DEVICE"])]
if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != "" if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != ""
else [], else 0, # noqa: E501
type=int, type=int,
help="Capture device number", help="Capture device number",
) )
@ -71,20 +67,16 @@ def set_argparse():
video_options.add_argument( video_options.add_argument(
"--no-display", "--no-display",
default=os.environ["NO_DISPLAY"] default=os.environ["NO_DISPLAY"]
if "NO_DISPLAY" in os.environ if "NO_DISPLAY" in os.environ and os.environ["NO_DISPLAY"] != ""
and os.environ["NO_DISPLAY"] != ""
and os.environ["NO_DISPLAY"].lower() != "false"
else False, else False,
action="store_true", action="store_true",
help="Don't display the video feed", help="Don't display the video feed",
) )
video_options.add_argument( video_options.add_argument(
"-c", '-c',
"--force-disable-tensorflow-gpu", '--force-disable-tensorflow-gpu',
default=os.environ["FORCE_DISABLE_TENSORFLOW_GPU"] default=os.environ["FORCE_DISABLE_TENSORFLOW_GPU"]
if "FORCE_DISABLE_TENSORFLOW_GPU" in os.environ if "FORCE_DISABLE_TENSORFLOW_GPU" in os.environ and os.environ["FORCE_DISABLE_TENSORFLOW_GPU"] != ""
and os.environ["FORCE_DISABLE_TENSORFLOW_GPU"] != ""
and os.environ["FORCE_DISABLE_TENSORFLOW_GPU"].lower() != "false"
else False, else False,
action="store_true", action="store_true",
help="Force disable tensorflow GPU through env since sometimes it's not worth it to install cudnn and whatnot", help="Force disable tensorflow GPU through env since sometimes it's not worth it to install cudnn and whatnot",
@ -100,7 +92,6 @@ def set_argparse():
help="The URL to send notifications to", help="The URL to send notifications to",
) )
# Various timers
timers = argparser.add_argument_group("Timers") timers = argparser.add_argument_group("Timers")
timers.add_argument( timers.add_argument(
"--detection-duration", "--detection-duration",
@ -128,6 +119,7 @@ def set_argparse():
help="The time (seconds) before another notification can be sent", help="The time (seconds) before another notification can be sent",
) )
face_recognition = argparser.add_argument_group("Face Recognition options") face_recognition = argparser.add_argument_group("Face Recognition options")
face_recognition.add_argument( face_recognition.add_argument(
"--faces-directory", "--faces-directory",
@ -151,17 +143,17 @@ def set_argparse():
default=os.environ["NO_REMOVE_REPRESENTATIONS"] default=os.environ["NO_REMOVE_REPRESENTATIONS"]
if "NO_REMOVE_REPRESENTATIONS" in os.environ if "NO_REMOVE_REPRESENTATIONS" in os.environ
and os.environ["NO_REMOVE_REPRESENTATIONS"] != "" and os.environ["NO_REMOVE_REPRESENTATIONS"] != ""
and os.environ["NO_REMOVE_REPRESENTATIONS"].lower() != "false"
else False, else False,
action="store_true", action="store_true",
help="Don't remove representations_<model>.pkl at the start of the program. Greatly improves startup time, but doesn't take into account changes to the faces directory since it was created", # noqa: E501 help="Don't remove representations_<model>.pkl at the start of the program. Greatly improves startup time, but doesn't take into account changes to the faces directory since it was created", # noqa: E501
) )
object_detection = argparser.add_argument_group("Object Detection options") object_detection = argparser.add_argument_group("Object Detection options")
object_detection.add_argument( object_detection.add_argument(
"--detect-object", "--detect-object",
action="append", nargs="*",
# Stuff is appended to default, as far as I can tell
default=[], default=[],
type=str, type=str,
help="The object(s) to detect. Must be something the model is trained to detect", help="The object(s) to detect. Must be something the model is trained to detect",
@ -171,25 +163,11 @@ def set_argparse():
default=os.environ["OBJECT_CONFIDENCE_THRESHOLD"] default=os.environ["OBJECT_CONFIDENCE_THRESHOLD"]
if "OBJECT_CONFIDENCE_THRESHOLD" in os.environ if "OBJECT_CONFIDENCE_THRESHOLD" in os.environ
and os.environ["OBJECT_CONFIDENCE_THRESHOLD"] != "" and os.environ["OBJECT_CONFIDENCE_THRESHOLD"] != ""
# I think this should always be a str so using lower shouldn't be a problem. else 0.6,
# Also, if the first check fails the rest shouldn't be run
and os.environ["OBJECT_CONFIDENCE_THRESHOLD"].lower() != "false" else 0.6,
type=float, type=float,
help="The confidence threshold to use", help="The confidence threshold to use",
) )
debug = argparser.add_argument_group("Debug options")
debug.add_argument(
"--fake-second-source",
help="Duplicate the first source and use it as a second source. Capture device takes priority.",
action="store_true",
default=os.environ["FAKE_SECOND_SOURCE"]
if "FAKE_SECOND_SOURCE" in os.environ
and os.environ["FAKE_SECOND_SOURCE"] != ""
and os.environ["FAKE_SECOND_SOURCE"].lower() != "false"
else False,
)
# return argparser # return argparser

View File

@ -1,163 +1,10 @@
import cv2 import cv2
import os
import numpy as np import numpy as np
from pathlib import Path from pathlib import Path
from deepface import DeepFace
# https://stackoverflow.com/a/42121886/18270659
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
from deepface import DeepFace # noqa: E402
from . import notify # noqa: E402
first_face_try = True first_face_try = True
# TODO: When multi-camera support is ~~added~~ improved, this will need to be changed so that each camera has its own dict
objects_and_peoples = {
"objects": {},
"peoples": {},
}
def process_footage(
# Frame
frame: np.ndarray = None,
# scale
run_scale: float = None,
view_scale: float = None,
# Face stuff
faces_directory: str = None,
face_confidence_threshold: float = None,
no_remove_representations: bool = False,
# Timer stuff
detection_window: int = None,
detection_duration: int = None,
notification_window: int = None,
ntfy_url: str = None,
# Object stuff
# YOLO object
model=None,
detect_object: list = None,
object_confidence_threshold=None,
) -> np.ndarray:
"""Takes in a frame and processes it"""
global objects_and_peoples
# Resize frame of video to a smaller size for faster recognition processing
run_frame = cv2.resize(frame, (0, 0), fx=run_scale, fy=run_scale)
# view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale)
results = model(run_frame, verbose=False)
path_to_faces = Path(faces_directory)
path_to_faces_exists = path_to_faces.is_dir()
for r in results:
# list of dicts with each dict containing a label, x1, y1, x2, y2
plot_boxes = []
# The following is stuff for people
# This is still in the for loop as each result, no matter if anything is detected, will be present.
# Thus, there will always be one result (r)
# Only run if path_to_faces exists
# May be better to check every iteration, but this also works
if path_to_faces_exists:
if face_details := recognize_face(
path_to_directory=path_to_faces,
run_frame=run_frame,
# Perhaps make these names match?
min_confidence=face_confidence_threshold,
no_remove_representations=no_remove_representations,
):
plot_boxes.append(face_details)
objects_and_peoples = notify.thing_detected(
thing_name=face_details["label"],
objects_and_peoples=objects_and_peoples,
detection_type="peoples",
detection_window=detection_window,
detection_duration=detection_duration,
notification_window=notification_window,
ntfy_url=ntfy_url,
)
# The following is stuff for objects
# Setup dictionary of object names
if (
objects_and_peoples["objects"] == {}
or objects_and_peoples["objects"] is None
):
for name in r.names.values():
objects_and_peoples["objects"][name] = {
"last_detection_time": None,
"detection_duration": None,
# "first_detection_time": None,
"last_notification_time": None,
}
# Also, make sure that the objects to detect are in the list of objects_and_peoples
# If it isn't, print a warning
for obj in detect_object:
# .keys() shouldn't be needed
if obj not in objects_and_peoples["objects"]:
print(
f"Warning: {obj} is not in the list of objects the model can detect!"
)
for box in r.boxes:
# Get the name of the object
class_id = r.names[box.cls[0].item()]
# Get the coordinates of the object
cords = box.xyxy[0].tolist()
cords = [round(x) for x in cords]
# Get the confidence
conf = round(box.conf[0].item(), 2)
# Print it out, adding a spacer between each object
# print("Object type:", class_id)
# print("Coordinates:", cords)
# print("Probability:", conf)
# print("---")
# Now do stuff (if conf > 0.5)
if conf < object_confidence_threshold or (
class_id not in detect_object and detect_object != []
):
# If the confidence is too low
# or if the object is not in the list of objects to detect and the list of objects to detect is not empty
# then skip this iteration
continue
# Add the object to the list of objects to plot
plot_boxes.append(
{
"label": class_id,
"x1": cords[0],
"y1": cords[1],
"x2": cords[2],
"y2": cords[3],
}
)
objects_and_peoples = notify.thing_detected(
thing_name=class_id,
objects_and_peoples=objects_and_peoples,
detection_type="objects",
detection_window=detection_window,
detection_duration=detection_duration,
notification_window=notification_window,
ntfy_url=ntfy_url,
)
# To debug plotting, use r.plot() to cross reference the bounding boxes drawn by the plot_label() and r.plot()
frame_to_show = plot_label(
boxes=plot_boxes,
full_frame=frame,
# full_frame=r.plot(),
run_scale=run_scale,
view_scale=view_scale,
)
# Unsure if this should also return the objects_and_peoples dict
return frame_to_show
def plot_label( def plot_label(
# list of dicts with each dict containing a label, x1, y1, x2, y2 # list of dicts with each dict containing a label, x1, y1, x2, y2
@ -171,7 +18,7 @@ def plot_label(
# So the coordinates will be scaled appropriately when coming from run_frame # So the coordinates will be scaled appropriately when coming from run_frame
view_scale: float = None, view_scale: float = None,
font: int = cv2.FONT_HERSHEY_SIMPLEX, font: int = cv2.FONT_HERSHEY_SIMPLEX,
) -> np.ndarray: ):
# x1 and y1 are the top left corner of the box # x1 and y1 are the top left corner of the box
# x2 and y2 are the bottom right corner of the box # x2 and y2 are the bottom right corner of the box
# Example scaling: full_frame: 1 run_frame: 0.5 view_frame: 0.25 # Example scaling: full_frame: 1 run_frame: 0.5 view_frame: 0.25
@ -230,8 +77,7 @@ def recognize_face(
Returns a single dictonary as currently only 1 face can be detected in each frame Returns a single dictonary as currently only 1 face can be detected in each frame
Cosine threshold is 0.3, so if the confidence is less than that, it will return None Cosine threshold is 0.3, so if the confidence is less than that, it will return None
dict conta # Maybe use os.exit() instead? dict contains the following keys: label, x1, y1, x2, y2
ins the following keys: label, x1, y1, x2, y2
The directory should be structured as follows: The directory should be structured as follows:
faces/ faces/
name/ name/
@ -278,11 +124,8 @@ def recognize_face(
model_name="ArcFace", model_name="ArcFace",
detector_backend="opencv", detector_backend="opencv",
) )
'''
Example dataframe, for reference except (ValueError) as e:
identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \\_('_')_/)
'''
except ValueError as e:
if ( if (
str(e) str(e)
== "Face could not be detected. Please confirm that the picture is a face photo or consider to set enforce_detection param to False." # noqa: E501 == "Face could not be detected. Please confirm that the picture is a face photo or consider to set enforce_detection param to False." # noqa: E501
@ -291,8 +134,7 @@ def recognize_face(
return None return None
elif ( elif (
# Check if the error message contains "Validate .jpg or .png files exist in this path." # Check if the error message contains "Validate .jpg or .png files exist in this path."
"Validate .jpg or .png files exist in this path." "Validate .jpg or .png files exist in this path." in str(e)
in str(e)
): ):
# If a verbose/silent flag is added, this should be changed to print only if verbose is true # If a verbose/silent flag is added, this should be changed to print only if verbose is true
# print("No faces found in database") # print("No faces found in database")
@ -334,4 +176,8 @@ def recognize_face(
f"Cosine similarity: {cosine_similarity}, filname: {path_to_image.name}, to_return: {to_return}" f"Cosine similarity: {cosine_similarity}, filname: {path_to_image.name}, to_return: {to_return}"
) )
return to_return return to_return
return None
"""
Example dataframe, for reference
identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \_('_')_/)
"""