Compare commits

...

21 Commits

Author SHA1 Message Date
slashtechno d56cee6751
Merge pull request #11 from slashtechno/multi-camera-support
Added support for multiple video sources
2024-02-16 13:20:31 -06:00
slashtechno f7f5db9f41
Updated TODO comments 2024-02-16 13:17:43 -06:00
deepsource-autofix[bot] 835e19ed18
refactor: autofix issues in 1 file
Resolved issues in wyzely_detect/utils/utils.py with DeepSource Autofix
2024-02-16 19:13:48 +00:00
slashtechno 4285be54b7
Replaced string with multi-line comment 2024-02-16 13:11:41 -06:00
slashtechno 5c1a22fa72
Update max line length (Deepsource) 2024-02-16 13:11:01 -06:00
slashtechno 37d39d434f
Fixed the following:
* Unused global var `objects_and_peoples` (`__main.py__`)
* Random `print` (`__main.py__`)
* Unescaped backslash (`utils.py`)
2024-02-16 13:05:27 -06:00
slashtechno c7d488d993
Fix various minor issues and format code 2024-02-16 13:01:02 -06:00
slashtechno b48edef250
Python `3.10.12` seems to work well 2024-02-16 12:48:52 -06:00
slashtechno bbcede0b3e
Check if video source is valid 2024-02-16 12:45:31 -06:00
slashtechno 8f500e0186
Multiple sources can now be used
Added `--fake-second-source` to help with debugging
2024-02-16 12:34:39 -06:00
slashtechno 494708a376
Supress TensorFlow warnings; update dependencies
Also use PrettyTable to list source resolution
2024-02-11 15:41:46 -06:00
slashtechno e9ace0f5e1
Merge pull request #10 from slashtechno/fix-dependency-issues
Make PyTorch GPU functionality optional
2024-02-11 14:29:13 -06:00
slashtechno 1a09004e3f
Made GPU capability toggleable 2024-02-11 14:27:05 -06:00
slashtechno 401c5cee16
Don't install TensorFlow with `and-cuda`
Most likely, this will prevent the GPU from being used by Deepface.
Thus, the optimal solution would be to do something similar to Torch where the GPU capability is optional.
2024-02-10 21:48:26 -06:00
slashtechno 3ac460a060
Made PyTorch GPU functionality optional 2024-02-10 21:38:22 -06:00
slashtechno d3c157df4d
(untested) - Added args for multiple sources
Could not test due to dependency problems on Windows
2024-02-10 21:16:11 -06:00
slashtechno 5cc5e04642
Works on MacOS? 2023-12-31 16:37:57 -06:00
slashtechno 82abe8b6d5
Fixed `--detect-object` when specifying multiple objects 2023-12-24 16:07:53 -06:00
slashtechno 06bd1ccbd7
Fixed incorrect warning when using `--detect-object` 2023-12-24 15:28:47 -06:00
slashtechno e7b63126d2
Fixed (?) prior error causing null `frame_to_show` 2023-12-22 16:44:21 -06:00
slashtechno bec1d5b979
Moved processing to `utils/utils.py`
Crashes when another face is introduced
2023-12-22 15:22:01 -06:00
9 changed files with 1130 additions and 1034 deletions

View File

@ -5,6 +5,8 @@ name = "python"
[analyzers.meta] [analyzers.meta]
runtime_version = "3.x.x" runtime_version = "3.x.x"
max_line_length = 135
[[analyzers]] [[analyzers]]
name = "docker" name = "docker"

View File

@ -1 +1 @@
3.10.5 3.10.12

12
.vscode/launch.json vendored
View File

@ -10,10 +10,20 @@
"request": "launch", "request": "launch",
"module": "wyzely_detect", "module": "wyzely_detect",
"args": [ "args": [
"--run-scale", "0.25", "--view-scale", "0.5", "--no-remove-representations" "--run-scale", "0.25", "--view-scale", "0.5", "--no-remove-representations", "--fake-second-source"
], ],
"justMyCode": true "justMyCode": true
}, },
// {
// "name": "Quick, Specific Debug",
// "type": "python",
// "request": "launch",
// "module": "wyzely_detect",
// "args": [
// "--run-scale", "0.25", "--view-scale", "0.5", "--no-remove-representations", "--detect-object", "person", "--detect-object", "cell phone"
// ],
// "justMyCode": true
// },
{ {
// "name": "Python: Module", // "name": "Python: Module",
"name": "Full Debug", "name": "Full Debug",

View File

@ -16,6 +16,9 @@ Recognize faces/objects in a video stream (from a webcam or a security camera) a
- All RTSP feeds _should_ work, however. - All RTSP feeds _should_ work, however.
- Python 3.10 or 3.11 - Python 3.10 or 3.11
- Poetry (optional) - Poetry (optional)
- Windows or Linux
- I've tested this on MacOS - it works on my 2014 MacBook Air but not a 2011 MacBook Pro
- Both were upgraded with OpenCore, with the MacBook Air running Monterey and the MacBook Pro running a newer version of MacOS, which may have been the problem
### Docker ### Docker
- A Wyze Cam - A Wyze Cam
@ -46,6 +49,7 @@ This assumes you have Python 3.10 or 3.11 installed
#### Poetry #### Poetry
1. `poetry install` 1. `poetry install`
a. For GPU support, use `poetry install -E cuda --with gpu`
2. `poetry run -- wyzely-detect` 2. `poetry run -- wyzely-detect`
### Configuration ### Configuration
The following are some basic CLI options. Most flags have environment variable equivalents which can be helpful when using Docker. The following are some basic CLI options. Most flags have environment variable equivalents which can be helpful when using Docker.

1638
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -21,11 +21,12 @@ ultralytics = "^8.0.190"
hjson = "^3.1.0" hjson = "^3.1.0"
numpy = "^1.23.2" numpy = "^1.23.2"
# https://github.com/python-poetry/poetry/issues/6409 # https://github.com/python-poetry/poetry/issues/6409#issuecomment-1911735833
torch = ">=2.0.0, !=2.0.1, !=2.1.0" # To install with GPU, use poetry install -E cuda --with gpu
torch = {version = "2.1.*", source = "pytorch-cpu", markers = "extra!='cuda'" }
# https://stackoverflow.com/a/76477590/18270659 # https://stackoverflow.com/a/76477590/18270659
# https://discuss.tensorflow.org/t/tensorflow-io-gcs-filesystem-with-windows/18849/4 # https://discfuss.tensorflow.org/t/tensorflow-io-gcs-filesystem-with-windows/18849/4
# Might be able to remove this version constraint later # Might be able to remove this version constraint later
# Working versions: # Working versions:
# Python version 3.10.12 and 3.10.5 both work # Python version 3.10.12 and 3.10.5 both work
@ -33,10 +34,33 @@ torch = ">=2.0.0, !=2.0.1, !=2.1.0"
# cuDNN version - 8.8.1 # cuDNN version - 8.8.1
# Installed from Nvidia website - nvidia-cuda-toolkit is not installed, but default PopOS drivers are installed # Installed from Nvidia website - nvidia-cuda-toolkit is not installed, but default PopOS drivers are installed
tensorflow-io-gcs-filesystem = "0.31.0" tensorflow-io-gcs-filesystem = "0.31.0"
tensorflow = {version = "^2.14.0", extras = ["and-cuda"]} tensorflow = {version = "^2.14.0", markers = "extra!='cuda'"}
deepface = "^0.0.79" deepface = "^0.0.79"
prettytable = "^3.9.0"
[tool.poetry.group.gpu]
optional = true
[tool.poetry.group.gpu.dependencies]
torch = {version = "2.1.*", source = "pytorch-cu121", markers = "extra=='cuda'"}
tensorflow = {version = "^2.14.0", extras = ["and-cuda"], markers = "extra=='cuda'"}
[tool.poetry.extras]
# Might be better to rename this to nocpu since it's more accurate
cuda = []
[[tool.poetry.source]]
name = "pytorch-cpu"
url = "https://download.pytorch.org/whl/cpu"
priority = "explicit"
[[tool.poetry.source]]
name = "pytorch-cu121"
url = "https://download.pytorch.org/whl/cu121"
priority = "explicit"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
black = "^23.9.1" black = "^23.9.1"

View File

@ -1,28 +1,22 @@
# import face_recognition # import face_recognition
from pathlib import Path from pathlib import Path
import os
import cv2 import cv2
import sys
from prettytable import PrettyTable
# import hjson as json # import hjson as json
import torch import torch
from ultralytics import YOLO from ultralytics import YOLO
from .utils import notify, utils from .utils import utils
from .utils.cli_args import argparser from .utils.cli_args import argparser
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
args = None args = None
objects_and_peoples = {
"objects": {},
"peoples": {},
}
def main(): def main():
global objects_and_peoples
global args global args
# RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") # Replace this with code to check for gpu
args = argparser.parse_args() args = argparser.parse_args()
@ -30,7 +24,7 @@ def main():
# https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168 # https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168
# Currently, I have been unable to set up Poetry to use GPU for Torch # Currently, I have been unable to set up Poetry to use GPU for Torch
for i in range(torch.cuda.device_count()): for i in range(torch.cuda.device_count()):
print(f'Using {torch.cuda.get_device_properties(i).name} for pytorch') print(f"Using {torch.cuda.get_device_properties(i).name} for pytorch")
if torch.cuda.is_available(): if torch.cuda.is_available():
torch.cuda.set_device(0) torch.cuda.set_device(0)
print("Set CUDA device") print("Set CUDA device")
@ -41,9 +35,10 @@ def main():
if args.force_disable_tensorflow_gpu: if args.force_disable_tensorflow_gpu:
print("Forcing tensorflow to use CPU") print("Forcing tensorflow to use CPU")
import tensorflow as tf import tensorflow as tf
tf.config.set_visible_devices([], 'GPU')
if tf.config.experimental.list_logical_devices('GPU'): tf.config.set_visible_devices([], "GPU")
print('GPU disabled unsuccessfully') if tf.config.experimental.list_logical_devices("GPU"):
print("GPU disabled unsuccessfully")
else: else:
print("GPU disabled successfully") print("GPU disabled successfully")
@ -51,140 +46,89 @@ def main():
# Depending on if the user wants to use a stream or a capture device, # Depending on if the user wants to use a stream or a capture device,
# Set the video capture to the appropriate source # Set the video capture to the appropriate source
if args.rtsp_url is not None: if not args.rtsp_url and not args.capture_device:
video_capture = cv2.VideoCapture(args.rtsp_url) print("No stream or capture device set, defaulting to capture device 0")
video_sources = {"devices": [cv2.VideoCapture(0)]}
else: else:
video_capture = cv2.VideoCapture(args.capture_device) video_sources = {
"streams": [cv2.VideoCapture(url) for url in args.rtsp_url],
"devices": [cv2.VideoCapture(device) for device in args.capture_device],
}
if args.fake_second_source:
try:
video_sources["devices"].append(video_sources["devices"][0])
except KeyError:
print("No capture device to use as second source. Trying stream.")
try:
video_sources["devices"].append(video_sources["devices"][0])
except KeyError:
print("No stream to use as a second source")
# When the code tries to resize the nonexistent capture device 1, the program will fail
# Eliminate lag by setting the buffer size to 1 # Eliminate lag by setting the buffer size to 1
# This makes it so that the video capture will only grab the most recent frame # This makes it so that the video capture will only grab the most recent frame
# However, this means that the video may be choppy # However, this means that the video may be choppy
video_capture.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Only do this for streams
try:
# Print the resolution of the video for stream in video_sources["streams"]:
print( stream.set(cv2.CAP_PROP_BUFFERSIZE, 1)
f"Video resolution: {video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)}x{video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)}" # noqa: E501 # If there are no streams, this will throw a KeyError
) except KeyError:
pass
# Print out the resolution of the video sources. Ideally, change this so the device ID/url is also printed
pretty_table = PrettyTable(field_names=["Source Type", "Resolution"])
for source_type, sources in video_sources.items():
for source in sources:
if (
source.get(cv2.CAP_PROP_FRAME_WIDTH) == 0
or source.get(cv2.CAP_PROP_FRAME_HEIGHT) == 0
):
message = "Capture for a source failed as resolution is 0x0.\n"
if source_type == "streams":
message += "Check if the stream URL is correct and if the stream is online."
else:
message += "Check if the capture device is connected, working, and not in use by another program."
print(message)
sys.exit(1)
pretty_table.add_row(
[
source_type,
f"{source.get(cv2.CAP_PROP_FRAME_WIDTH)}x{source.get(cv2.CAP_PROP_FRAME_HEIGHT)}",
]
)
print(pretty_table)
print("Beginning video capture...") print("Beginning video capture...")
while True: while True:
# Grab a single frame of video # Grab a single frame of video
ret, frame = video_capture.read() frames = []
# Resize frame of video to a smaller size for faster recognition processing # frames = [source.read() for sources in video_sources.values() for source in sources]
run_frame = cv2.resize(frame, (0, 0), fx=args.run_scale, fy=args.run_scale) for list_of_sources in video_sources.values():
# view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale) frames.extend([source.read()[1] for source in list_of_sources])
frames_to_show = []
results = model(run_frame, verbose=False) for frame in frames:
frames_to_show.append(
path_to_faces = Path(args.faces_directory) utils.process_footage(
path_to_faces_exists = path_to_faces.is_dir() frame=frame,
run_scale=args.run_scale,
for i, r in enumerate(results): view_scale=args.view_scale,
# list of dicts with each dict containing a label, x1, y1, x2, y2 faces_directory=Path(args.faces_directory),
plot_boxes = [] face_confidence_threshold=args.face_confidence_threshold,
# The following is stuff for people
# This is still in the for loop as each result, no matter if anything is detected, will be present.
# Thus, there will always be one result (r)
# Only run if path_to_faces exists
# May be better to check every iteration, but this also works
if path_to_faces_exists:
if face_details := utils.recognize_face(
path_to_directory=path_to_faces,
run_frame=run_frame,
min_confidence=args.face_confidence_threshold,
no_remove_representations=args.no_remove_representations, no_remove_representations=args.no_remove_representations,
):
plot_boxes.append(face_details)
objects_and_peoples = notify.thing_detected(
thing_name=face_details["label"],
objects_and_peoples=objects_and_peoples,
detection_type="peoples",
detection_window=args.detection_window,
detection_duration=args.detection_duration,
notification_window=args.notification_window,
ntfy_url=args.ntfy_url,
)
# The following is stuff for objects
# Setup dictionary of object names
if (
objects_and_peoples["objects"] == {}
or objects_and_peoples["objects"] is None
):
for name in r.names.values():
objects_and_peoples["objects"][name] = {
"last_detection_time": None,
"detection_duration": None,
# "first_detection_time": None,
"last_notification_time": None,
}
# Also, make sure that the objects to detect are in the list of objects_and_peoples
# If it isn't, print a warning
for obj in args.detect_object:
if obj not in objects_and_peoples:
print(
f"Warning: {obj} is not in the list of objects the model can detect!"
)
for box in r.boxes:
# Get the name of the object
class_id = r.names[box.cls[0].item()]
# Get the coordinates of the object
cords = box.xyxy[0].tolist()
cords = [round(x) for x in cords]
# Get the confidence
conf = round(box.conf[0].item(), 2)
# Print it out, adding a spacer between each object
# print("Object type:", class_id)
# print("Coordinates:", cords)
# print("Probability:", conf)
# print("---")
# Now do stuff (if conf > 0.5)
if conf < args.object_confidence_threshold or (
class_id not in args.detect_object and args.detect_object != []
):
# If the confidence is too low
# or if the object is not in the list of objects to detect and the list of objects to detect is not empty
# then skip this iteration
continue
# Add the object to the list of objects to plot
plot_boxes.append(
{
"label": class_id,
"x1": cords[0],
"y1": cords[1],
"x2": cords[2],
"y2": cords[3],
}
)
objects_and_peoples = notify.thing_detected(
thing_name=class_id,
objects_and_peoples=objects_and_peoples,
detection_type="objects",
detection_window=args.detection_window, detection_window=args.detection_window,
detection_duration=args.detection_duration, detection_duration=args.detection_duration,
notification_window=args.notification_window, notification_window=args.notification_window,
ntfy_url=args.ntfy_url, ntfy_url=args.ntfy_url,
model=model,
detect_object=args.detect_object,
object_confidence_threshold=args.object_confidence_threshold,
) )
# To debug plotting, use r.plot() to cross reference the bounding boxes drawn by the plot_label() and r.plot()
frame_to_show = utils.plot_label(
boxes=plot_boxes,
full_frame=frame,
# full_frame=r.plot(),
run_scale=args.run_scale,
view_scale=args.view_scale,
) )
# Display the resulting frame
# Display the resulting frame if not args.no_display:
# cv2.imshow("", r) for i, frame_to_show in enumerate(frames_to_show):
if not args.no_display: cv2.imshow(f"Video {i}", frame_to_show)
cv2.imshow(f"Video{i}", frame_to_show)
# Hit 'q' on the keyboard to quit! # Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord("q"): if cv2.waitKey(1) & 0xFF == ord("q"):
@ -192,7 +136,7 @@ def main():
# Release handle to the webcam # Release handle to the webcam
print("Releasing video capture") print("Releasing video capture")
video_capture.release() [source.release() for sources in video_sources.values() for source in sources]
cv2.destroyAllWindows() cv2.destroyAllWindows()

View File

@ -15,31 +15,35 @@ def set_argparse():
else: else:
print("No .env file found") print("No .env file found")
# One important thing to consider is that most function parameters are optional and have a default value # One important thing to consider is that most function parameters are optional and have a default value
# However, with argparse, those are never used since a argparse always passes something, even if it's None # However, with argparse, those are never used since a argparse always passes something, even if it's None
argparser = argparse.ArgumentParser( argparser = argparse.ArgumentParser(
prog="Wyzely Detect", prog="Wyzely Detect",
description="Recognize faces/objects in a video stream (from a webcam or a security camera) and send notifications to your devices", # noqa: E501 description="Recognize faces/objects in a video stream (from a webcam or a security camera) and send notifications to your devices", # noqa: E501
epilog=":)", epilog="For env bool options, setting them to anything except for an empty string will enable them.",
) )
video_options = argparser.add_argument_group("Video Options") video_options = argparser.add_argument_group("Video Options")
stream_source = video_options.add_mutually_exclusive_group() stream_source = video_options.add_mutually_exclusive_group()
stream_source.add_argument( stream_source.add_argument(
"--rtsp-url", "--rtsp-url",
default=os.environ["RTSP_URL"] action="append",
# If RTSP_URL is in the environment, use it, otherwise just use a blank list
# This may cause problems down the road, but if it does, env for this can be removed
default=[os.environ["RTSP_URL"]]
if "RTSP_URL" in os.environ and os.environ["RTSP_URL"] != "" if "RTSP_URL" in os.environ and os.environ["RTSP_URL"] != ""
else None, # noqa: E501 else [],
type=str, type=str,
help="RTSP camera URL", help="RTSP camera URL",
) )
stream_source.add_argument( stream_source.add_argument(
"--capture-device", "--capture-device",
default=os.environ["CAPTURE_DEVICE"] action="append",
# If CAPTURE_DEVICE is in the environment, use it, otherwise just use a blank list
# If __main__.py detects that no capture device or remote stream is set, it will default to 0
default=[int(os.environ["CAPTURE_DEVICE"])]
if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != "" if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != ""
else 0, # noqa: E501 else [],
type=int, type=int,
help="Capture device number", help="Capture device number",
) )
@ -67,16 +71,20 @@ def set_argparse():
video_options.add_argument( video_options.add_argument(
"--no-display", "--no-display",
default=os.environ["NO_DISPLAY"] default=os.environ["NO_DISPLAY"]
if "NO_DISPLAY" in os.environ and os.environ["NO_DISPLAY"] != "" if "NO_DISPLAY" in os.environ
and os.environ["NO_DISPLAY"] != ""
and os.environ["NO_DISPLAY"].lower() != "false"
else False, else False,
action="store_true", action="store_true",
help="Don't display the video feed", help="Don't display the video feed",
) )
video_options.add_argument( video_options.add_argument(
'-c', "-c",
'--force-disable-tensorflow-gpu', "--force-disable-tensorflow-gpu",
default=os.environ["FORCE_DISABLE_TENSORFLOW_GPU"] default=os.environ["FORCE_DISABLE_TENSORFLOW_GPU"]
if "FORCE_DISABLE_TENSORFLOW_GPU" in os.environ and os.environ["FORCE_DISABLE_TENSORFLOW_GPU"] != "" if "FORCE_DISABLE_TENSORFLOW_GPU" in os.environ
and os.environ["FORCE_DISABLE_TENSORFLOW_GPU"] != ""
and os.environ["FORCE_DISABLE_TENSORFLOW_GPU"].lower() != "false"
else False, else False,
action="store_true", action="store_true",
help="Force disable tensorflow GPU through env since sometimes it's not worth it to install cudnn and whatnot", help="Force disable tensorflow GPU through env since sometimes it's not worth it to install cudnn and whatnot",
@ -92,6 +100,7 @@ def set_argparse():
help="The URL to send notifications to", help="The URL to send notifications to",
) )
# Various timers
timers = argparser.add_argument_group("Timers") timers = argparser.add_argument_group("Timers")
timers.add_argument( timers.add_argument(
"--detection-duration", "--detection-duration",
@ -119,7 +128,6 @@ def set_argparse():
help="The time (seconds) before another notification can be sent", help="The time (seconds) before another notification can be sent",
) )
face_recognition = argparser.add_argument_group("Face Recognition options") face_recognition = argparser.add_argument_group("Face Recognition options")
face_recognition.add_argument( face_recognition.add_argument(
"--faces-directory", "--faces-directory",
@ -143,17 +151,17 @@ def set_argparse():
default=os.environ["NO_REMOVE_REPRESENTATIONS"] default=os.environ["NO_REMOVE_REPRESENTATIONS"]
if "NO_REMOVE_REPRESENTATIONS" in os.environ if "NO_REMOVE_REPRESENTATIONS" in os.environ
and os.environ["NO_REMOVE_REPRESENTATIONS"] != "" and os.environ["NO_REMOVE_REPRESENTATIONS"] != ""
and os.environ["NO_REMOVE_REPRESENTATIONS"].lower() != "false"
else False, else False,
action="store_true", action="store_true",
help="Don't remove representations_<model>.pkl at the start of the program. Greatly improves startup time, but doesn't take into account changes to the faces directory since it was created", # noqa: E501 help="Don't remove representations_<model>.pkl at the start of the program. Greatly improves startup time, but doesn't take into account changes to the faces directory since it was created", # noqa: E501
) )
object_detection = argparser.add_argument_group("Object Detection options") object_detection = argparser.add_argument_group("Object Detection options")
object_detection.add_argument( object_detection.add_argument(
"--detect-object", "--detect-object",
nargs="*", action="append",
# Stuff is appended to default, as far as I can tell
default=[], default=[],
type=str, type=str,
help="The object(s) to detect. Must be something the model is trained to detect", help="The object(s) to detect. Must be something the model is trained to detect",
@ -163,11 +171,25 @@ def set_argparse():
default=os.environ["OBJECT_CONFIDENCE_THRESHOLD"] default=os.environ["OBJECT_CONFIDENCE_THRESHOLD"]
if "OBJECT_CONFIDENCE_THRESHOLD" in os.environ if "OBJECT_CONFIDENCE_THRESHOLD" in os.environ
and os.environ["OBJECT_CONFIDENCE_THRESHOLD"] != "" and os.environ["OBJECT_CONFIDENCE_THRESHOLD"] != ""
else 0.6, # I think this should always be a str so using lower shouldn't be a problem.
# Also, if the first check fails the rest shouldn't be run
and os.environ["OBJECT_CONFIDENCE_THRESHOLD"].lower() != "false" else 0.6,
type=float, type=float,
help="The confidence threshold to use", help="The confidence threshold to use",
) )
debug = argparser.add_argument_group("Debug options")
debug.add_argument(
"--fake-second-source",
help="Duplicate the first source and use it as a second source. Capture device takes priority.",
action="store_true",
default=os.environ["FAKE_SECOND_SOURCE"]
if "FAKE_SECOND_SOURCE" in os.environ
and os.environ["FAKE_SECOND_SOURCE"] != ""
and os.environ["FAKE_SECOND_SOURCE"].lower() != "false"
else False,
)
# return argparser # return argparser

View File

@ -1,10 +1,163 @@
import cv2 import cv2
import os
import numpy as np import numpy as np
from pathlib import Path from pathlib import Path
from deepface import DeepFace
# https://stackoverflow.com/a/42121886/18270659
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
from deepface import DeepFace # noqa: E402
from . import notify # noqa: E402
first_face_try = True first_face_try = True
# TODO: When multi-camera support is ~~added~~ improved, this will need to be changed so that each camera has its own dict
objects_and_peoples = {
"objects": {},
"peoples": {},
}
def process_footage(
# Frame
frame: np.ndarray = None,
# scale
run_scale: float = None,
view_scale: float = None,
# Face stuff
faces_directory: str = None,
face_confidence_threshold: float = None,
no_remove_representations: bool = False,
# Timer stuff
detection_window: int = None,
detection_duration: int = None,
notification_window: int = None,
ntfy_url: str = None,
# Object stuff
# YOLO object
model=None,
detect_object: list = None,
object_confidence_threshold=None,
) -> np.ndarray:
"""Takes in a frame and processes it"""
global objects_and_peoples
# Resize frame of video to a smaller size for faster recognition processing
run_frame = cv2.resize(frame, (0, 0), fx=run_scale, fy=run_scale)
# view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale)
results = model(run_frame, verbose=False)
path_to_faces = Path(faces_directory)
path_to_faces_exists = path_to_faces.is_dir()
for r in results:
# list of dicts with each dict containing a label, x1, y1, x2, y2
plot_boxes = []
# The following is stuff for people
# This is still in the for loop as each result, no matter if anything is detected, will be present.
# Thus, there will always be one result (r)
# Only run if path_to_faces exists
# May be better to check every iteration, but this also works
if path_to_faces_exists:
if face_details := recognize_face(
path_to_directory=path_to_faces,
run_frame=run_frame,
# Perhaps make these names match?
min_confidence=face_confidence_threshold,
no_remove_representations=no_remove_representations,
):
plot_boxes.append(face_details)
objects_and_peoples = notify.thing_detected(
thing_name=face_details["label"],
objects_and_peoples=objects_and_peoples,
detection_type="peoples",
detection_window=detection_window,
detection_duration=detection_duration,
notification_window=notification_window,
ntfy_url=ntfy_url,
)
# The following is stuff for objects
# Setup dictionary of object names
if (
objects_and_peoples["objects"] == {}
or objects_and_peoples["objects"] is None
):
for name in r.names.values():
objects_and_peoples["objects"][name] = {
"last_detection_time": None,
"detection_duration": None,
# "first_detection_time": None,
"last_notification_time": None,
}
# Also, make sure that the objects to detect are in the list of objects_and_peoples
# If it isn't, print a warning
for obj in detect_object:
# .keys() shouldn't be needed
if obj not in objects_and_peoples["objects"]:
print(
f"Warning: {obj} is not in the list of objects the model can detect!"
)
for box in r.boxes:
# Get the name of the object
class_id = r.names[box.cls[0].item()]
# Get the coordinates of the object
cords = box.xyxy[0].tolist()
cords = [round(x) for x in cords]
# Get the confidence
conf = round(box.conf[0].item(), 2)
# Print it out, adding a spacer between each object
# print("Object type:", class_id)
# print("Coordinates:", cords)
# print("Probability:", conf)
# print("---")
# Now do stuff (if conf > 0.5)
if conf < object_confidence_threshold or (
class_id not in detect_object and detect_object != []
):
# If the confidence is too low
# or if the object is not in the list of objects to detect and the list of objects to detect is not empty
# then skip this iteration
continue
# Add the object to the list of objects to plot
plot_boxes.append(
{
"label": class_id,
"x1": cords[0],
"y1": cords[1],
"x2": cords[2],
"y2": cords[3],
}
)
objects_and_peoples = notify.thing_detected(
thing_name=class_id,
objects_and_peoples=objects_and_peoples,
detection_type="objects",
detection_window=detection_window,
detection_duration=detection_duration,
notification_window=notification_window,
ntfy_url=ntfy_url,
)
# To debug plotting, use r.plot() to cross reference the bounding boxes drawn by the plot_label() and r.plot()
frame_to_show = plot_label(
boxes=plot_boxes,
full_frame=frame,
# full_frame=r.plot(),
run_scale=run_scale,
view_scale=view_scale,
)
# Unsure if this should also return the objects_and_peoples dict
return frame_to_show
def plot_label( def plot_label(
# list of dicts with each dict containing a label, x1, y1, x2, y2 # list of dicts with each dict containing a label, x1, y1, x2, y2
@ -18,7 +171,7 @@ def plot_label(
# So the coordinates will be scaled appropriately when coming from run_frame # So the coordinates will be scaled appropriately when coming from run_frame
view_scale: float = None, view_scale: float = None,
font: int = cv2.FONT_HERSHEY_SIMPLEX, font: int = cv2.FONT_HERSHEY_SIMPLEX,
): ) -> np.ndarray:
# x1 and y1 are the top left corner of the box # x1 and y1 are the top left corner of the box
# x2 and y2 are the bottom right corner of the box # x2 and y2 are the bottom right corner of the box
# Example scaling: full_frame: 1 run_frame: 0.5 view_frame: 0.25 # Example scaling: full_frame: 1 run_frame: 0.5 view_frame: 0.25
@ -72,26 +225,27 @@ def recognize_face(
no_remove_representations: bool = False, no_remove_representations: bool = False,
) -> np.ndarray: ) -> np.ndarray:
""" """
Accepts a path to a directory of images of faces to be used as a refference Accepts a path to a directory of images of faces to be used as a refference
In addition, accepts an opencv image to be used as the frame to be searched In addition, accepts an opencv image to be used as the frame to be searched
Returns a single dictonary as currently only 1 face can be detected in each frame Returns a single dictonary as currently only 1 face can be detected in each frame
Cosine threshold is 0.3, so if the confidence is less than that, it will return None Cosine threshold is 0.3, so if the confidence is less than that, it will return None
dict contains the following keys: label, x1, y1, x2, y2 dict conta # Maybe use os.exit() instead?
The directory should be structured as follows: ins the following keys: label, x1, y1, x2, y2
faces/ The directory should be structured as follows:
name/ faces/
image1.jpg name/
image2.jpg image1.jpg
image3.jpg image2.jpg
name2/ image3.jpg
image1.jpg name2/
image2.jpg image1.jpg
image3.jpg image2.jpg
(not neccessarily jpgs, but you get the idea) image3.jpg
(not neccessarily jpgs, but you get the idea)
Point is, `name` is the name of the person in the images in the directory `name` Point is, `name` is the name of the person in the images in the directory `name`
That name will be used as the label for the face in the frame That name will be used as the label for the face in the frame
""" """
global first_face_try global first_face_try
@ -124,8 +278,11 @@ def recognize_face(
model_name="ArcFace", model_name="ArcFace",
detector_backend="opencv", detector_backend="opencv",
) )
'''
except (ValueError) as e: Example dataframe, for reference
identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \\_('_')_/)
'''
except ValueError as e:
if ( if (
str(e) str(e)
== "Face could not be detected. Please confirm that the picture is a face photo or consider to set enforce_detection param to False." # noqa: E501 == "Face could not be detected. Please confirm that the picture is a face photo or consider to set enforce_detection param to False." # noqa: E501
@ -134,7 +291,8 @@ def recognize_face(
return None return None
elif ( elif (
# Check if the error message contains "Validate .jpg or .png files exist in this path." # Check if the error message contains "Validate .jpg or .png files exist in this path."
"Validate .jpg or .png files exist in this path." in str(e) "Validate .jpg or .png files exist in this path."
in str(e)
): ):
# If a verbose/silent flag is added, this should be changed to print only if verbose is true # If a verbose/silent flag is added, this should be changed to print only if verbose is true
# print("No faces found in database") # print("No faces found in database")
@ -176,8 +334,4 @@ def recognize_face(
f"Cosine similarity: {cosine_similarity}, filname: {path_to_image.name}, to_return: {to_return}" f"Cosine similarity: {cosine_similarity}, filname: {path_to_image.name}, to_return: {to_return}"
) )
return to_return return to_return
return None
"""
Example dataframe, for reference
identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \_('_')_/)
"""