Compare commits

..

8 Commits

Author SHA1 Message Date
slashtechno f7f5db9f41
Updated TODO comments 2024-02-16 13:17:43 -06:00
deepsource-autofix[bot] 835e19ed18
refactor: autofix issues in 1 file
Resolved issues in wyzely_detect/utils/utils.py with DeepSource Autofix
2024-02-16 19:13:48 +00:00
slashtechno 4285be54b7
Replaced string with multi-line comment 2024-02-16 13:11:41 -06:00
slashtechno 37d39d434f
Fixed the following:
* Unused global var `objects_and_peoples` (`__main.py__`)
* Random `print` (`__main.py__`)
* Unescaped backslash (`utils.py`)
2024-02-16 13:05:27 -06:00
slashtechno c7d488d993
Fix various minor issues and format code 2024-02-16 13:01:02 -06:00
slashtechno b48edef250
Python `3.10.12` seems to work well 2024-02-16 12:48:52 -06:00
slashtechno bbcede0b3e
Check if video source is valid 2024-02-16 12:45:31 -06:00
slashtechno 8f500e0186
Multiple sources can now be used
Added `--fake-second-source` to help with debugging
2024-02-16 12:34:39 -06:00
5 changed files with 131 additions and 100 deletions

View File

@ -1 +1 @@
3.11.5 3.10.12

2
.vscode/launch.json vendored
View File

@ -10,7 +10,7 @@
"request": "launch", "request": "launch",
"module": "wyzely_detect", "module": "wyzely_detect",
"args": [ "args": [
"--run-scale", "0.25", "--view-scale", "0.5", "--no-remove-representations" "--run-scale", "0.25", "--view-scale", "0.5", "--no-remove-representations", "--fake-second-source"
], ],
"justMyCode": true "justMyCode": true
}, },

View File

@ -1,8 +1,7 @@
# import face_recognition # import face_recognition
from pathlib import Path from pathlib import Path
import cv2 import cv2
import os import sys
from prettytable import PrettyTable from prettytable import PrettyTable
# import hjson as json # import hjson as json
@ -17,17 +16,15 @@ args = None
def main(): def main():
global objects_and_peoples
global args global args
args = argparser.parse_args() args = argparser.parse_args()
# Check if a CUDA GPU is available. If it is, set it via torch. If not, set it to cpu # Check if a CUDA GPU is available. If it is, set it via torch. If not, set it to cpu
# https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168 # https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168
# Currently, I have been unable to set up Poetry to use GPU for Torch # Currently, I have been unable to set up Poetry to use GPU for Torch
for i in range(torch.cuda.device_count()): for i in range(torch.cuda.device_count()):
print(f'Using {torch.cuda.get_device_properties(i).name} for pytorch') print(f"Using {torch.cuda.get_device_properties(i).name} for pytorch")
if torch.cuda.is_available(): if torch.cuda.is_available():
torch.cuda.set_device(0) torch.cuda.set_device(0)
print("Set CUDA device") print("Set CUDA device")
@ -38,9 +35,10 @@ def main():
if args.force_disable_tensorflow_gpu: if args.force_disable_tensorflow_gpu:
print("Forcing tensorflow to use CPU") print("Forcing tensorflow to use CPU")
import tensorflow as tf import tensorflow as tf
tf.config.set_visible_devices([], 'GPU')
if tf.config.experimental.list_logical_devices('GPU'): tf.config.set_visible_devices([], "GPU")
print('GPU disabled unsuccessfully') if tf.config.experimental.list_logical_devices("GPU"):
print("GPU disabled unsuccessfully")
else: else:
print("GPU disabled successfully") print("GPU disabled successfully")
@ -50,15 +48,24 @@ def main():
# Set the video capture to the appropriate source # Set the video capture to the appropriate source
if not args.rtsp_url and not args.capture_device: if not args.rtsp_url and not args.capture_device:
print("No stream or capture device set, defaulting to capture device 0") print("No stream or capture device set, defaulting to capture device 0")
video_sources = { video_sources = {"devices": [cv2.VideoCapture(0)]}
"devices": [cv2.VideoCapture(0)]
}
else: else:
video_sources = { video_sources = {
"streams": [cv2.VideoCapture(url) for url in args.rtsp_url], "streams": [cv2.VideoCapture(url) for url in args.rtsp_url],
"devices": [cv2.VideoCapture(device) for device in args.capture_device], "devices": [cv2.VideoCapture(device) for device in args.capture_device],
} }
if args.fake_second_source:
try:
video_sources["devices"].append(video_sources["devices"][0])
except KeyError:
print("No capture device to use as second source. Trying stream.")
try:
video_sources["devices"].append(video_sources["devices"][0])
except KeyError:
print("No stream to use as a second source")
# When the code tries to resize the nonexistent capture device 1, the program will fail
# Eliminate lag by setting the buffer size to 1 # Eliminate lag by setting the buffer size to 1
# This makes it so that the video capture will only grab the most recent frame # This makes it so that the video capture will only grab the most recent frame
# However, this means that the video may be choppy # However, this means that the video may be choppy
@ -74,40 +81,54 @@ def main():
pretty_table = PrettyTable(field_names=["Source Type", "Resolution"]) pretty_table = PrettyTable(field_names=["Source Type", "Resolution"])
for source_type, sources in video_sources.items(): for source_type, sources in video_sources.items():
for source in sources: for source in sources:
if (
source.get(cv2.CAP_PROP_FRAME_WIDTH) == 0
or source.get(cv2.CAP_PROP_FRAME_HEIGHT) == 0
):
message = "Capture for a source failed as resolution is 0x0.\n"
if source_type == "streams":
message += "Check if the stream URL is correct and if the stream is online."
else:
message += "Check if the capture device is connected, working, and not in use by another program."
print(message)
sys.exit(1)
pretty_table.add_row( pretty_table.add_row(
[source_type, f"{source.get(cv2.CAP_PROP_FRAME_WIDTH)}x{source.get(cv2.CAP_PROP_FRAME_HEIGHT)}"] [
source_type,
f"{source.get(cv2.CAP_PROP_FRAME_WIDTH)}x{source.get(cv2.CAP_PROP_FRAME_HEIGHT)}",
]
) )
print(pretty_table) print(pretty_table)
print
print("Beginning video capture...") print("Beginning video capture...")
while True: while True:
# Grab a single frame of video # Grab a single frame of video
ret, frame = video_capture.read() frames = []
# frames = [source.read() for sources in video_sources.values() for source in sources]
frame_to_show = utils.process_footage( for list_of_sources in video_sources.values():
frame = frame, frames.extend([source.read()[1] for source in list_of_sources])
run_scale = args.run_scale, frames_to_show = []
view_scale = args.view_scale, for frame in frames:
frames_to_show.append(
faces_directory=Path(args.faces_directory), utils.process_footage(
face_confidence_threshold=args.face_confidence_threshold, frame=frame,
no_remove_representations=args.no_remove_representations, run_scale=args.run_scale,
view_scale=args.view_scale,
detection_window=args.detection_window, faces_directory=Path(args.faces_directory),
detection_duration=args.detection_duration, face_confidence_threshold=args.face_confidence_threshold,
notification_window=args.notification_window, no_remove_representations=args.no_remove_representations,
detection_window=args.detection_window,
ntfy_url=args.ntfy_url, detection_duration=args.detection_duration,
notification_window=args.notification_window,
model=model, ntfy_url=args.ntfy_url,
detect_object=args.detect_object, model=model,
object_confidence_threshold=args.object_confidence_threshold, detect_object=args.detect_object,
) object_confidence_threshold=args.object_confidence_threshold,
)
)
# Display the resulting frame # Display the resulting frame
# TODO: When multi-camera support is added, this needs to be changed to allow all feeds
if not args.no_display: if not args.no_display:
cv2.imshow("Video", frame_to_show) for i, frame_to_show in enumerate(frames_to_show):
cv2.imshow(f"Video {i}", frame_to_show)
# Hit 'q' on the keyboard to quit! # Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord("q"): if cv2.waitKey(1) & 0xFF == ord("q"):
@ -115,7 +136,7 @@ def main():
# Release handle to the webcam # Release handle to the webcam
print("Releasing video capture") print("Releasing video capture")
video_capture.release() [source.release() for sources in video_sources.values() for source in sources]
cv2.destroyAllWindows() cv2.destroyAllWindows()

View File

@ -15,16 +15,14 @@ def set_argparse():
else: else:
print("No .env file found") print("No .env file found")
# One important thing to consider is that most function parameters are optional and have a default value # One important thing to consider is that most function parameters are optional and have a default value
# However, with argparse, those are never used since a argparse always passes something, even if it's None # However, with argparse, those are never used since a argparse always passes something, even if it's None
argparser = argparse.ArgumentParser( argparser = argparse.ArgumentParser(
prog="Wyzely Detect", prog="Wyzely Detect",
description="Recognize faces/objects in a video stream (from a webcam or a security camera) and send notifications to your devices", # noqa: E501 description="Recognize faces/objects in a video stream (from a webcam or a security camera) and send notifications to your devices", # noqa: E501
epilog=":)", epilog="For env bool options, setting them to anything except for an empty string will enable them.",
) )
video_options = argparser.add_argument_group("Video Options") video_options = argparser.add_argument_group("Video Options")
stream_source = video_options.add_mutually_exclusive_group() stream_source = video_options.add_mutually_exclusive_group()
stream_source.add_argument( stream_source.add_argument(
@ -32,7 +30,9 @@ def set_argparse():
action="append", action="append",
# If RTSP_URL is in the environment, use it, otherwise just use a blank list # If RTSP_URL is in the environment, use it, otherwise just use a blank list
# This may cause problems down the road, but if it does, env for this can be removed # This may cause problems down the road, but if it does, env for this can be removed
default=[os.environ["RTSP_URL"]] if "RTSP_URL" in os.environ and os.environ["RTSP_URL"] != "" else [], default=[os.environ["RTSP_URL"]]
if "RTSP_URL" in os.environ and os.environ["RTSP_URL"] != ""
else [],
type=str, type=str,
help="RTSP camera URL", help="RTSP camera URL",
) )
@ -41,7 +41,9 @@ def set_argparse():
action="append", action="append",
# If CAPTURE_DEVICE is in the environment, use it, otherwise just use a blank list # If CAPTURE_DEVICE is in the environment, use it, otherwise just use a blank list
# If __main__.py detects that no capture device or remote stream is set, it will default to 0 # If __main__.py detects that no capture device or remote stream is set, it will default to 0
default=[int(os.environ["CAPTURE_DEVICE"])] if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != "" else [], default=[int(os.environ["CAPTURE_DEVICE"])]
if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != ""
else [],
type=int, type=int,
help="Capture device number", help="Capture device number",
) )
@ -69,16 +71,20 @@ def set_argparse():
video_options.add_argument( video_options.add_argument(
"--no-display", "--no-display",
default=os.environ["NO_DISPLAY"] default=os.environ["NO_DISPLAY"]
if "NO_DISPLAY" in os.environ and os.environ["NO_DISPLAY"] != "" if "NO_DISPLAY" in os.environ
and os.environ["NO_DISPLAY"] != ""
and os.environ["NO_DISPLAY"].lower() != "false"
else False, else False,
action="store_true", action="store_true",
help="Don't display the video feed", help="Don't display the video feed",
) )
video_options.add_argument( video_options.add_argument(
'-c', "-c",
'--force-disable-tensorflow-gpu', "--force-disable-tensorflow-gpu",
default=os.environ["FORCE_DISABLE_TENSORFLOW_GPU"] default=os.environ["FORCE_DISABLE_TENSORFLOW_GPU"]
if "FORCE_DISABLE_TENSORFLOW_GPU" in os.environ and os.environ["FORCE_DISABLE_TENSORFLOW_GPU"] != "" if "FORCE_DISABLE_TENSORFLOW_GPU" in os.environ
and os.environ["FORCE_DISABLE_TENSORFLOW_GPU"] != ""
and os.environ["FORCE_DISABLE_TENSORFLOW_GPU"].lower() != "false"
else False, else False,
action="store_true", action="store_true",
help="Force disable tensorflow GPU through env since sometimes it's not worth it to install cudnn and whatnot", help="Force disable tensorflow GPU through env since sometimes it's not worth it to install cudnn and whatnot",
@ -122,7 +128,6 @@ def set_argparse():
help="The time (seconds) before another notification can be sent", help="The time (seconds) before another notification can be sent",
) )
face_recognition = argparser.add_argument_group("Face Recognition options") face_recognition = argparser.add_argument_group("Face Recognition options")
face_recognition.add_argument( face_recognition.add_argument(
"--faces-directory", "--faces-directory",
@ -146,13 +151,12 @@ def set_argparse():
default=os.environ["NO_REMOVE_REPRESENTATIONS"] default=os.environ["NO_REMOVE_REPRESENTATIONS"]
if "NO_REMOVE_REPRESENTATIONS" in os.environ if "NO_REMOVE_REPRESENTATIONS" in os.environ
and os.environ["NO_REMOVE_REPRESENTATIONS"] != "" and os.environ["NO_REMOVE_REPRESENTATIONS"] != ""
and os.environ["NO_REMOVE_REPRESENTATIONS"].lower() != "false"
else False, else False,
action="store_true", action="store_true",
help="Don't remove representations_<model>.pkl at the start of the program. Greatly improves startup time, but doesn't take into account changes to the faces directory since it was created", # noqa: E501 help="Don't remove representations_<model>.pkl at the start of the program. Greatly improves startup time, but doesn't take into account changes to the faces directory since it was created", # noqa: E501
) )
object_detection = argparser.add_argument_group("Object Detection options") object_detection = argparser.add_argument_group("Object Detection options")
object_detection.add_argument( object_detection.add_argument(
"--detect-object", "--detect-object",
@ -167,11 +171,25 @@ def set_argparse():
default=os.environ["OBJECT_CONFIDENCE_THRESHOLD"] default=os.environ["OBJECT_CONFIDENCE_THRESHOLD"]
if "OBJECT_CONFIDENCE_THRESHOLD" in os.environ if "OBJECT_CONFIDENCE_THRESHOLD" in os.environ
and os.environ["OBJECT_CONFIDENCE_THRESHOLD"] != "" and os.environ["OBJECT_CONFIDENCE_THRESHOLD"] != ""
else 0.6, # I think this should always be a str so using lower shouldn't be a problem.
# Also, if the first check fails the rest shouldn't be run
and os.environ["OBJECT_CONFIDENCE_THRESHOLD"].lower() != "false" else 0.6,
type=float, type=float,
help="The confidence threshold to use", help="The confidence threshold to use",
) )
debug = argparser.add_argument_group("Debug options")
debug.add_argument(
"--fake-second-source",
help="Duplicate the first source and use it as a second source. Capture device takes priority.",
action="store_true",
default=os.environ["FAKE_SECOND_SOURCE"]
if "FAKE_SECOND_SOURCE" in os.environ
and os.environ["FAKE_SECOND_SOURCE"] != ""
and os.environ["FAKE_SECOND_SOURCE"].lower() != "false"
else False,
)
# return argparser # return argparser

View File

@ -2,16 +2,17 @@ import cv2
import os import os
import numpy as np import numpy as np
from pathlib import Path from pathlib import Path
# https://stackoverflow.com/a/42121886/18270659 # https://stackoverflow.com/a/42121886/18270659
os.environ['TF_CPP_MIN_LOG_LEVEL']='3' os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
from deepface import DeepFace # noqa: E402 from deepface import DeepFace # noqa: E402
from . import notify # noqa: E402 from . import notify # noqa: E402
first_face_try = True first_face_try = True
# TODO: When multi-camera support is added, this will need to be changed so that each camera has its own dict # TODO: When multi-camera support is ~~added~~ improved, this will need to be changed so that each camera has its own dict
objects_and_peoples = { objects_and_peoples = {
"objects": {}, "objects": {},
"peoples": {}, "peoples": {},
@ -21,36 +22,27 @@ objects_and_peoples = {
def process_footage( def process_footage(
# Frame # Frame
frame: np.ndarray = None, frame: np.ndarray = None,
# scale # scale
run_scale: float = None, run_scale: float = None,
view_scale: float = None, view_scale: float = None,
# Face stuff # Face stuff
faces_directory: str = None, faces_directory: str = None,
face_confidence_threshold: float = None, face_confidence_threshold: float = None,
no_remove_representations: bool = False, no_remove_representations: bool = False,
# Timer stuff # Timer stuff
detection_window: int = None, detection_window: int = None,
detection_duration: int = None, detection_duration: int = None,
notification_window: int = None, notification_window: int = None,
ntfy_url: str = None, ntfy_url: str = None,
# Object stuff # Object stuff
# YOLO object # YOLO object
model = None, model=None,
detect_object: list = None, detect_object: list = None,
object_confidence_threshold = None object_confidence_threshold=None,
) -> np.ndarray: ) -> np.ndarray:
""" """Takes in a frame and processes it"""
Takes in a frame and processes it
"""
global objects_and_peoples global objects_and_peoples
# Resize frame of video to a smaller size for faster recognition processing # Resize frame of video to a smaller size for faster recognition processing
run_frame = cv2.resize(frame, (0, 0), fx=run_scale, fy=run_scale) run_frame = cv2.resize(frame, (0, 0), fx=run_scale, fy=run_scale)
# view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale) # view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale)
@ -60,7 +52,7 @@ def process_footage(
path_to_faces = Path(faces_directory) path_to_faces = Path(faces_directory)
path_to_faces_exists = path_to_faces.is_dir() path_to_faces_exists = path_to_faces.is_dir()
for i, r in enumerate(results): for r in results:
# list of dicts with each dict containing a label, x1, y1, x2, y2 # list of dicts with each dict containing a label, x1, y1, x2, y2
plot_boxes = [] plot_boxes = []
@ -105,7 +97,8 @@ def process_footage(
# Also, make sure that the objects to detect are in the list of objects_and_peoples # Also, make sure that the objects to detect are in the list of objects_and_peoples
# If it isn't, print a warning # If it isn't, print a warning
for obj in detect_object: for obj in detect_object:
if obj not in objects_and_peoples["objects"].keys(): # .keys() shouldn't be needed
if obj not in objects_and_peoples["objects"]:
print( print(
f"Warning: {obj} is not in the list of objects the model can detect!" f"Warning: {obj} is not in the list of objects the model can detect!"
) )
@ -154,7 +147,6 @@ def process_footage(
ntfy_url=ntfy_url, ntfy_url=ntfy_url,
) )
# To debug plotting, use r.plot() to cross reference the bounding boxes drawn by the plot_label() and r.plot() # To debug plotting, use r.plot() to cross reference the bounding boxes drawn by the plot_label() and r.plot()
frame_to_show = plot_label( frame_to_show = plot_label(
boxes=plot_boxes, boxes=plot_boxes,
@ -233,26 +225,27 @@ def recognize_face(
no_remove_representations: bool = False, no_remove_representations: bool = False,
) -> np.ndarray: ) -> np.ndarray:
""" """
Accepts a path to a directory of images of faces to be used as a refference Accepts a path to a directory of images of faces to be used as a refference
In addition, accepts an opencv image to be used as the frame to be searched In addition, accepts an opencv image to be used as the frame to be searched
Returns a single dictonary as currently only 1 face can be detected in each frame Returns a single dictonary as currently only 1 face can be detected in each frame
Cosine threshold is 0.3, so if the confidence is less than that, it will return None Cosine threshold is 0.3, so if the confidence is less than that, it will return None
dict contains the following keys: label, x1, y1, x2, y2 dict conta # Maybe use os.exit() instead?
The directory should be structured as follows: ins the following keys: label, x1, y1, x2, y2
faces/ The directory should be structured as follows:
name/ faces/
image1.jpg name/
image2.jpg image1.jpg
image3.jpg image2.jpg
name2/ image3.jpg
image1.jpg name2/
image2.jpg image1.jpg
image3.jpg image2.jpg
(not neccessarily jpgs, but you get the idea) image3.jpg
(not neccessarily jpgs, but you get the idea)
Point is, `name` is the name of the person in the images in the directory `name` Point is, `name` is the name of the person in the images in the directory `name`
That name will be used as the label for the face in the frame That name will be used as the label for the face in the frame
""" """
global first_face_try global first_face_try
@ -285,8 +278,11 @@ def recognize_face(
model_name="ArcFace", model_name="ArcFace",
detector_backend="opencv", detector_backend="opencv",
) )
'''
except (ValueError) as e: Example dataframe, for reference
identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \\_('_')_/)
'''
except ValueError as e:
if ( if (
str(e) str(e)
== "Face could not be detected. Please confirm that the picture is a face photo or consider to set enforce_detection param to False." # noqa: E501 == "Face could not be detected. Please confirm that the picture is a face photo or consider to set enforce_detection param to False." # noqa: E501
@ -295,7 +291,8 @@ def recognize_face(
return None return None
elif ( elif (
# Check if the error message contains "Validate .jpg or .png files exist in this path." # Check if the error message contains "Validate .jpg or .png files exist in this path."
"Validate .jpg or .png files exist in this path." in str(e) "Validate .jpg or .png files exist in this path."
in str(e)
): ):
# If a verbose/silent flag is added, this should be changed to print only if verbose is true # If a verbose/silent flag is added, this should be changed to print only if verbose is true
# print("No faces found in database") # print("No faces found in database")
@ -338,8 +335,3 @@ def recognize_face(
) )
return to_return return to_return
return None return None
"""
Example dataframe, for reference
identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \_('_')_/)
"""