Compare commits

..

8 Commits

Author SHA1 Message Date
slashtechno f7f5db9f41
Updated TODO comments 2024-02-16 13:17:43 -06:00
deepsource-autofix[bot] 835e19ed18
refactor: autofix issues in 1 file
Resolved issues in wyzely_detect/utils/utils.py with DeepSource Autofix
2024-02-16 19:13:48 +00:00
slashtechno 4285be54b7
Replaced string with multi-line comment 2024-02-16 13:11:41 -06:00
slashtechno 37d39d434f
Fixed the following:
* Unused global var `objects_and_peoples` (`__main.py__`)
* Random `print` (`__main.py__`)
* Unescaped backslash (`utils.py`)
2024-02-16 13:05:27 -06:00
slashtechno c7d488d993
Fix various minor issues and format code 2024-02-16 13:01:02 -06:00
slashtechno b48edef250
Python `3.10.12` seems to work well 2024-02-16 12:48:52 -06:00
slashtechno bbcede0b3e
Check if video source is valid 2024-02-16 12:45:31 -06:00
slashtechno 8f500e0186
Multiple sources can now be used
Added `--fake-second-source` to help with debugging
2024-02-16 12:34:39 -06:00
5 changed files with 131 additions and 100 deletions

View File

@ -1 +1 @@
3.11.5
3.10.12

2
.vscode/launch.json vendored
View File

@ -10,7 +10,7 @@
"request": "launch",
"module": "wyzely_detect",
"args": [
"--run-scale", "0.25", "--view-scale", "0.5", "--no-remove-representations"
"--run-scale", "0.25", "--view-scale", "0.5", "--no-remove-representations", "--fake-second-source"
],
"justMyCode": true
},

View File

@ -1,8 +1,7 @@
# import face_recognition
from pathlib import Path
import cv2
import os
import sys
from prettytable import PrettyTable
# import hjson as json
@ -17,9 +16,7 @@ args = None
def main():
global objects_and_peoples
global args
global args
args = argparser.parse_args()
@ -27,7 +24,7 @@ def main():
# https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168
# Currently, I have been unable to set up Poetry to use GPU for Torch
for i in range(torch.cuda.device_count()):
print(f'Using {torch.cuda.get_device_properties(i).name} for pytorch')
print(f"Using {torch.cuda.get_device_properties(i).name} for pytorch")
if torch.cuda.is_available():
torch.cuda.set_device(0)
print("Set CUDA device")
@ -38,9 +35,10 @@ def main():
if args.force_disable_tensorflow_gpu:
print("Forcing tensorflow to use CPU")
import tensorflow as tf
tf.config.set_visible_devices([], 'GPU')
if tf.config.experimental.list_logical_devices('GPU'):
print('GPU disabled unsuccessfully')
tf.config.set_visible_devices([], "GPU")
if tf.config.experimental.list_logical_devices("GPU"):
print("GPU disabled unsuccessfully")
else:
print("GPU disabled successfully")
@ -50,15 +48,24 @@ def main():
# Set the video capture to the appropriate source
if not args.rtsp_url and not args.capture_device:
print("No stream or capture device set, defaulting to capture device 0")
video_sources = {
"devices": [cv2.VideoCapture(0)]
}
video_sources = {"devices": [cv2.VideoCapture(0)]}
else:
video_sources = {
"streams": [cv2.VideoCapture(url) for url in args.rtsp_url],
"devices": [cv2.VideoCapture(device) for device in args.capture_device],
}
if args.fake_second_source:
try:
video_sources["devices"].append(video_sources["devices"][0])
except KeyError:
print("No capture device to use as second source. Trying stream.")
try:
video_sources["devices"].append(video_sources["devices"][0])
except KeyError:
print("No stream to use as a second source")
# When the code tries to resize the nonexistent capture device 1, the program will fail
# Eliminate lag by setting the buffer size to 1
# This makes it so that the video capture will only grab the most recent frame
# However, this means that the video may be choppy
@ -74,40 +81,54 @@ def main():
pretty_table = PrettyTable(field_names=["Source Type", "Resolution"])
for source_type, sources in video_sources.items():
for source in sources:
if (
source.get(cv2.CAP_PROP_FRAME_WIDTH) == 0
or source.get(cv2.CAP_PROP_FRAME_HEIGHT) == 0
):
message = "Capture for a source failed as resolution is 0x0.\n"
if source_type == "streams":
message += "Check if the stream URL is correct and if the stream is online."
else:
message += "Check if the capture device is connected, working, and not in use by another program."
print(message)
sys.exit(1)
pretty_table.add_row(
[source_type, f"{source.get(cv2.CAP_PROP_FRAME_WIDTH)}x{source.get(cv2.CAP_PROP_FRAME_HEIGHT)}"]
[
source_type,
f"{source.get(cv2.CAP_PROP_FRAME_WIDTH)}x{source.get(cv2.CAP_PROP_FRAME_HEIGHT)}",
]
)
print(pretty_table)
print
print("Beginning video capture...")
while True:
# Grab a single frame of video
ret, frame = video_capture.read()
frame_to_show = utils.process_footage(
frame = frame,
run_scale = args.run_scale,
view_scale = args.view_scale,
faces_directory=Path(args.faces_directory),
face_confidence_threshold=args.face_confidence_threshold,
no_remove_representations=args.no_remove_representations,
detection_window=args.detection_window,
detection_duration=args.detection_duration,
notification_window=args.notification_window,
ntfy_url=args.ntfy_url,
model=model,
detect_object=args.detect_object,
object_confidence_threshold=args.object_confidence_threshold,
)
frames = []
# frames = [source.read() for sources in video_sources.values() for source in sources]
for list_of_sources in video_sources.values():
frames.extend([source.read()[1] for source in list_of_sources])
frames_to_show = []
for frame in frames:
frames_to_show.append(
utils.process_footage(
frame=frame,
run_scale=args.run_scale,
view_scale=args.view_scale,
faces_directory=Path(args.faces_directory),
face_confidence_threshold=args.face_confidence_threshold,
no_remove_representations=args.no_remove_representations,
detection_window=args.detection_window,
detection_duration=args.detection_duration,
notification_window=args.notification_window,
ntfy_url=args.ntfy_url,
model=model,
detect_object=args.detect_object,
object_confidence_threshold=args.object_confidence_threshold,
)
)
# Display the resulting frame
# TODO: When multi-camera support is added, this needs to be changed to allow all feeds
if not args.no_display:
cv2.imshow("Video", frame_to_show)
for i, frame_to_show in enumerate(frames_to_show):
cv2.imshow(f"Video {i}", frame_to_show)
# Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord("q"):
@ -115,7 +136,7 @@ def main():
# Release handle to the webcam
print("Releasing video capture")
video_capture.release()
[source.release() for sources in video_sources.values() for source in sources]
cv2.destroyAllWindows()

View File

@ -15,16 +15,14 @@ def set_argparse():
else:
print("No .env file found")
# One important thing to consider is that most function parameters are optional and have a default value
# However, with argparse, those are never used since a argparse always passes something, even if it's None
argparser = argparse.ArgumentParser(
prog="Wyzely Detect",
description="Recognize faces/objects in a video stream (from a webcam or a security camera) and send notifications to your devices", # noqa: E501
epilog=":)",
epilog="For env bool options, setting them to anything except for an empty string will enable them.",
)
video_options = argparser.add_argument_group("Video Options")
stream_source = video_options.add_mutually_exclusive_group()
stream_source.add_argument(
@ -32,7 +30,9 @@ def set_argparse():
action="append",
# If RTSP_URL is in the environment, use it, otherwise just use a blank list
# This may cause problems down the road, but if it does, env for this can be removed
default=[os.environ["RTSP_URL"]] if "RTSP_URL" in os.environ and os.environ["RTSP_URL"] != "" else [],
default=[os.environ["RTSP_URL"]]
if "RTSP_URL" in os.environ and os.environ["RTSP_URL"] != ""
else [],
type=str,
help="RTSP camera URL",
)
@ -41,7 +41,9 @@ def set_argparse():
action="append",
# If CAPTURE_DEVICE is in the environment, use it, otherwise just use a blank list
# If __main__.py detects that no capture device or remote stream is set, it will default to 0
default=[int(os.environ["CAPTURE_DEVICE"])] if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != "" else [],
default=[int(os.environ["CAPTURE_DEVICE"])]
if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != ""
else [],
type=int,
help="Capture device number",
)
@ -69,16 +71,20 @@ def set_argparse():
video_options.add_argument(
"--no-display",
default=os.environ["NO_DISPLAY"]
if "NO_DISPLAY" in os.environ and os.environ["NO_DISPLAY"] != ""
if "NO_DISPLAY" in os.environ
and os.environ["NO_DISPLAY"] != ""
and os.environ["NO_DISPLAY"].lower() != "false"
else False,
action="store_true",
help="Don't display the video feed",
)
video_options.add_argument(
'-c',
'--force-disable-tensorflow-gpu',
"-c",
"--force-disable-tensorflow-gpu",
default=os.environ["FORCE_DISABLE_TENSORFLOW_GPU"]
if "FORCE_DISABLE_TENSORFLOW_GPU" in os.environ and os.environ["FORCE_DISABLE_TENSORFLOW_GPU"] != ""
if "FORCE_DISABLE_TENSORFLOW_GPU" in os.environ
and os.environ["FORCE_DISABLE_TENSORFLOW_GPU"] != ""
and os.environ["FORCE_DISABLE_TENSORFLOW_GPU"].lower() != "false"
else False,
action="store_true",
help="Force disable tensorflow GPU through env since sometimes it's not worth it to install cudnn and whatnot",
@ -122,7 +128,6 @@ def set_argparse():
help="The time (seconds) before another notification can be sent",
)
face_recognition = argparser.add_argument_group("Face Recognition options")
face_recognition.add_argument(
"--faces-directory",
@ -146,13 +151,12 @@ def set_argparse():
default=os.environ["NO_REMOVE_REPRESENTATIONS"]
if "NO_REMOVE_REPRESENTATIONS" in os.environ
and os.environ["NO_REMOVE_REPRESENTATIONS"] != ""
and os.environ["NO_REMOVE_REPRESENTATIONS"].lower() != "false"
else False,
action="store_true",
help="Don't remove representations_<model>.pkl at the start of the program. Greatly improves startup time, but doesn't take into account changes to the faces directory since it was created", # noqa: E501
)
object_detection = argparser.add_argument_group("Object Detection options")
object_detection.add_argument(
"--detect-object",
@ -167,11 +171,25 @@ def set_argparse():
default=os.environ["OBJECT_CONFIDENCE_THRESHOLD"]
if "OBJECT_CONFIDENCE_THRESHOLD" in os.environ
and os.environ["OBJECT_CONFIDENCE_THRESHOLD"] != ""
else 0.6,
# I think this should always be a str so using lower shouldn't be a problem.
# Also, if the first check fails the rest shouldn't be run
and os.environ["OBJECT_CONFIDENCE_THRESHOLD"].lower() != "false" else 0.6,
type=float,
help="The confidence threshold to use",
)
debug = argparser.add_argument_group("Debug options")
debug.add_argument(
"--fake-second-source",
help="Duplicate the first source and use it as a second source. Capture device takes priority.",
action="store_true",
default=os.environ["FAKE_SECOND_SOURCE"]
if "FAKE_SECOND_SOURCE" in os.environ
and os.environ["FAKE_SECOND_SOURCE"] != ""
and os.environ["FAKE_SECOND_SOURCE"].lower() != "false"
else False,
)
# return argparser

View File

@ -2,16 +2,17 @@ import cv2
import os
import numpy as np
from pathlib import Path
# https://stackoverflow.com/a/42121886/18270659
os.environ['TF_CPP_MIN_LOG_LEVEL']='3'
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
from deepface import DeepFace # noqa: E402
from . import notify # noqa: E402
from deepface import DeepFace # noqa: E402
from . import notify # noqa: E402
first_face_try = True
# TODO: When multi-camera support is added, this will need to be changed so that each camera has its own dict
# TODO: When multi-camera support is ~~added~~ improved, this will need to be changed so that each camera has its own dict
objects_and_peoples = {
"objects": {},
"peoples": {},
@ -21,36 +22,27 @@ objects_and_peoples = {
def process_footage(
# Frame
frame: np.ndarray = None,
# scale
run_scale: float = None,
view_scale: float = None,
# Face stuff
faces_directory: str = None,
face_confidence_threshold: float = None,
no_remove_representations: bool = False,
# Timer stuff
detection_window: int = None,
detection_duration: int = None,
notification_window: int = None,
ntfy_url: str = None,
# Object stuff
# YOLO object
model = None,
model=None,
detect_object: list = None,
object_confidence_threshold = None
object_confidence_threshold=None,
) -> np.ndarray:
"""
Takes in a frame and processes it
"""
"""Takes in a frame and processes it"""
global objects_and_peoples
# Resize frame of video to a smaller size for faster recognition processing
run_frame = cv2.resize(frame, (0, 0), fx=run_scale, fy=run_scale)
# view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale)
@ -60,7 +52,7 @@ def process_footage(
path_to_faces = Path(faces_directory)
path_to_faces_exists = path_to_faces.is_dir()
for i, r in enumerate(results):
for r in results:
# list of dicts with each dict containing a label, x1, y1, x2, y2
plot_boxes = []
@ -105,7 +97,8 @@ def process_footage(
# Also, make sure that the objects to detect are in the list of objects_and_peoples
# If it isn't, print a warning
for obj in detect_object:
if obj not in objects_and_peoples["objects"].keys():
# .keys() shouldn't be needed
if obj not in objects_and_peoples["objects"]:
print(
f"Warning: {obj} is not in the list of objects the model can detect!"
)
@ -153,7 +146,6 @@ def process_footage(
notification_window=notification_window,
ntfy_url=ntfy_url,
)
# To debug plotting, use r.plot() to cross reference the bounding boxes drawn by the plot_label() and r.plot()
frame_to_show = plot_label(
@ -233,26 +225,27 @@ def recognize_face(
no_remove_representations: bool = False,
) -> np.ndarray:
"""
Accepts a path to a directory of images of faces to be used as a refference
In addition, accepts an opencv image to be used as the frame to be searched
Accepts a path to a directory of images of faces to be used as a refference
In addition, accepts an opencv image to be used as the frame to be searched
Returns a single dictonary as currently only 1 face can be detected in each frame
Cosine threshold is 0.3, so if the confidence is less than that, it will return None
dict contains the following keys: label, x1, y1, x2, y2
The directory should be structured as follows:
faces/
name/
image1.jpg
image2.jpg
image3.jpg
name2/
image1.jpg
image2.jpg
image3.jpg
(not neccessarily jpgs, but you get the idea)
Returns a single dictonary as currently only 1 face can be detected in each frame
Cosine threshold is 0.3, so if the confidence is less than that, it will return None
dict conta # Maybe use os.exit() instead?
ins the following keys: label, x1, y1, x2, y2
The directory should be structured as follows:
faces/
name/
image1.jpg
image2.jpg
image3.jpg
name2/
image1.jpg
image2.jpg
image3.jpg
(not neccessarily jpgs, but you get the idea)
Point is, `name` is the name of the person in the images in the directory `name`
That name will be used as the label for the face in the frame
Point is, `name` is the name of the person in the images in the directory `name`
That name will be used as the label for the face in the frame
"""
global first_face_try
@ -285,8 +278,11 @@ def recognize_face(
model_name="ArcFace",
detector_backend="opencv",
)
except (ValueError) as e:
'''
Example dataframe, for reference
identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \\_('_')_/)
'''
except ValueError as e:
if (
str(e)
== "Face could not be detected. Please confirm that the picture is a face photo or consider to set enforce_detection param to False." # noqa: E501
@ -295,11 +291,12 @@ def recognize_face(
return None
elif (
# Check if the error message contains "Validate .jpg or .png files exist in this path."
"Validate .jpg or .png files exist in this path." in str(e)
"Validate .jpg or .png files exist in this path."
in str(e)
):
# If a verbose/silent flag is added, this should be changed to print only if verbose is true
# print("No faces found in database")
return None
return None
else:
raise e
# Iteate over the dataframes
@ -338,8 +335,3 @@ def recognize_face(
)
return to_return
return None
"""
Example dataframe, for reference
identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \_('_')_/)
"""