wyzely-detect/wyzely_detect/__main__.py

323 lines
12 KiB
Python
Raw Permalink Normal View History

2023-10-02 01:56:40 +01:00
# import face_recognition
import cv2
import dotenv
from pathlib import Path
import os
2023-10-05 03:03:11 +01:00
2023-10-02 01:56:40 +01:00
# import hjson as json
import torch
from ultralytics import YOLO
import argparse
from .utils import notify
from .utils import utils
2023-10-02 01:56:40 +01:00
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
args = None
2023-10-14 21:40:36 +01:00
objects_and_peoples = {
"objects": {},
"peoples": {},
}
2023-10-05 03:03:11 +01:00
2023-10-02 01:56:40 +01:00
def main():
2023-10-14 21:40:36 +01:00
global objects_and_peoples
2023-10-02 01:56:40 +01:00
global args
# RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") # Replace this with code to check for gpu
if Path(".env").is_file():
dotenv.load_dotenv()
print("Loaded .env file")
else:
print("No .env file found")
# TODO: If possible, move the argparse stuff to a separate file
# It's taking up too many lines in this file
2023-10-02 01:56:40 +01:00
argparser = argparse.ArgumentParser(
prog="Wyzely Detect",
2023-10-22 22:54:30 +01:00
description="Recognize faces/objects in a video stream (from a webcam or a security camera) and send notifications to your devices", # noqa: E501
2023-10-02 01:56:40 +01:00
epilog=":)",
)
2023-10-05 03:03:11 +01:00
# required='RUN_SCALE' not in os.environ,
2023-10-02 01:56:40 +01:00
argparser.add_argument(
2023-10-05 03:03:11 +01:00
"--run-scale",
# Set it to the env RUN_SCALE if it isn't blank, otherwise set it to 0.25
default=os.environ["RUN_SCALE"]
if "RUN_SCALE" in os.environ and os.environ["RUN_SCALE"] != ""
2023-10-15 01:25:27 +01:00
# else 0.25,
else 1,
2023-10-05 03:03:11 +01:00
type=float,
help="The scale to run the detection at, default is 0.25",
2023-10-02 01:56:40 +01:00
)
argparser.add_argument(
2023-10-14 00:16:55 +01:00
"--view-scale",
# Set it to the env VIEW_SCALE if it isn't blank, otherwise set it to 0.75
default=os.environ["VIEW_SCALE"]
if "VIEW_SCALE" in os.environ and os.environ["VIEW_SCALE"] != ""
2023-10-15 01:25:27 +01:00
# else 0.75,
else 1,
2023-10-14 00:16:55 +01:00
type=float,
help="The scale to view the detection at, default is 0.75",
)
2023-10-02 01:56:40 +01:00
argparser.add_argument(
"--no-display",
default=os.environ["NO_DISPLAY"]
if "NO_DISPLAY" in os.environ and os.environ["NO_DISPLAY"] != ""
else False,
action="store_true",
help="Don't display the video feed",
)
argparser.add_argument(
"--confidence-threshold",
default=os.environ["CONFIDENCE_THRESHOLD"]
if "CONFIDENCE_THRESHOLD" in os.environ
and os.environ["CONFIDENCE_THRESHOLD"] != ""
else 0.6,
type=float,
help="The confidence threshold to use",
)
2023-10-14 21:40:36 +01:00
argparser.add_argument(
"--faces-directory",
default=os.environ["FACES_DIRECTORY"]
if "FACES_DIRECTORY" in os.environ and os.environ["FACES_DIRECTORY"] != ""
else "faces",
type=str,
2023-10-22 22:54:30 +01:00
help="The directory to store the faces. Can either contain images or subdirectories with images, the latter being the preferred method", # noqa: E501
2023-10-14 21:40:36 +01:00
)
argparser.add_argument(
"--detect-object",
nargs="*",
default=[],
type=str,
help="The object(s) to detect. Must be something the model is trained to detect",
)
2023-10-15 01:25:27 +01:00
2023-10-05 03:03:11 +01:00
stream_source = argparser.add_mutually_exclusive_group()
2023-10-06 00:40:53 +01:00
stream_source.add_argument(
2023-10-06 02:00:12 +01:00
"--url",
default=os.environ["URL"]
if "URL" in os.environ and os.environ["URL"] != ""
else None, # noqa: E501
2023-10-06 00:40:53 +01:00
type=str,
help="The URL of the stream to use",
)
2023-10-02 01:56:40 +01:00
stream_source.add_argument(
2023-10-05 03:03:11 +01:00
"--capture-device",
default=os.environ["CAPTURE_DEVICE"]
if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != ""
else 0, # noqa: E501
2023-10-02 01:56:40 +01:00
type=int,
2023-10-05 03:03:11 +01:00
help="The capture device to use. Can also be a url.",
2023-10-02 01:56:40 +01:00
)
2023-10-15 01:25:27 +01:00
# Defaults for the stuff here and down are already set in notify.py.
2023-10-14 21:40:36 +01:00
# Setting them here just means that argparse will display the default values as defualt
# TODO: Perhaps just remove the default parameter and just add to the help message that the default is set is x
# TODO: Make ntfy optional in ntfy.py. Currently, unless there is a local or LAN instance of ntfy, this can't run offline
2023-10-02 01:56:40 +01:00
notifcation_services = argparser.add_argument_group("Notification Services")
notifcation_services.add_argument(
2023-10-05 03:03:11 +01:00
"--ntfy-url",
default=os.environ["NTFY_URL"]
if "NTFY_URL" in os.environ and os.environ["NTFY_URL"] != ""
else "https://ntfy.sh/wyzely-detect",
2023-10-02 01:56:40 +01:00
type=str,
help="The URL to send notifications to",
)
2023-10-06 02:17:31 +01:00
timers = argparser.add_argument_group("Timers")
timers.add_argument(
"--detection-duration",
default=os.environ["DETECTION_DURATION"]
if "DETECTION_DURATION" in os.environ and os.environ["DETECTION_DURATION"] != ""
else 2,
type=int,
help="The duration (in seconds) that an object must be detected for before sending a notification",
)
timers.add_argument(
"--detection-window",
default=os.environ["DETECTION_WINDOW"]
if "DETECTION_WINDOW" in os.environ and os.environ["DETECTION_WINDOW"] != ""
else 15,
type=int,
help="The time (seconds) before the detection duration resets",
)
timers.add_argument(
"--notification-window",
default=os.environ["NOTIFICATION_WINDOW"]
if "NOTIFICATION_WINDOW" in os.environ
and os.environ["NOTIFICATION_WINDOW"] != ""
else 30,
type=int,
help="The time (seconds) before another notification can be sent",
)
2023-10-02 01:56:40 +01:00
args = argparser.parse_args()
# Check if a CUDA GPU is available. If it is, set it via torch. If not, set it to cpu
2023-10-02 01:56:40 +01:00
# https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168
2023-10-05 03:03:11 +01:00
# Currently, I have been unable to set up Poetry to use GPU for Torch
for i in range(torch.cuda.device_count()):
print(torch.cuda.get_device_properties(i).name)
if torch.cuda.is_available():
2023-10-02 01:56:40 +01:00
torch.cuda.set_device(0)
print("Set CUDA device")
2023-10-05 03:03:11 +01:00
else:
2023-10-02 01:56:40 +01:00
print("No CUDA device available, using CPU")
2023-10-05 03:03:11 +01:00
2023-10-02 01:56:40 +01:00
model = YOLO("yolov8n.pt")
2023-10-06 00:40:53 +01:00
# Depending on if the user wants to use a stream or a capture device,
# Set the video capture to the appropriate source
if args.url:
video_capture = cv2.VideoCapture(args.url)
else:
video_capture = cv2.VideoCapture(args.capture_device)
2023-10-02 01:56:40 +01:00
# Eliminate lag by setting the buffer size to 1
# This makes it so that the video capture will only grab the most recent frame
# However, this means that the video may be choppy
video_capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# Print the resolution of the video
print(
f"Video resolution: {video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)}x{video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)}" # noqa: E501
)
print("Beginning video capture...")
while True:
# Grab a single frame of video
ret, frame = video_capture.read()
# Only process every other frame of video to save time
# Resize frame of video to a smaller size for faster recognition processing
run_frame = cv2.resize(frame, (0, 0), fx=args.run_scale, fy=args.run_scale)
# view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale)
2023-10-05 03:03:11 +01:00
results = model(run_frame, verbose=False)
path_to_faces = Path(args.faces_directory)
2023-10-22 22:54:30 +01:00
path_to_faces_exists = path_to_faces.is_dir()
for i, r in enumerate(results):
# list of dicts with each dict containing a label, x1, y1, x2, y2
plot_boxes = []
# The following is stuff for people
# This is still in the for loop as each result, no matter if anything is detected, will be present.
# Thus, there will always be one result (r)
# Only run if path_to_faces exists
# May be better to check every iteration, but this also works
if path_to_faces_exists:
if face_details := utils.recognize_face(
2023-10-22 22:54:30 +01:00
path_to_directory=path_to_faces, run_frame=run_frame
):
plot_boxes.append(face_details)
objects_and_peoples = notify.thing_detected(
thing_name=face_details["label"],
objects_and_peoples=objects_and_peoples,
detection_type="peoples",
detection_window=args.detection_window,
detection_duration=args.detection_duration,
notification_window=args.notification_window,
ntfy_url=args.ntfy_url,
)
# The following is stuff for objects
2023-10-05 03:03:11 +01:00
# Setup dictionary of object names
2023-10-15 01:25:27 +01:00
if (
objects_and_peoples["objects"] == {}
or objects_and_peoples["objects"] is None
):
2023-10-05 03:03:11 +01:00
for name in r.names.values():
2023-10-14 21:40:36 +01:00
objects_and_peoples["objects"][name] = {
2023-10-05 03:03:11 +01:00
"last_detection_time": None,
"detection_duration": None,
# "first_detection_time": None,
2023-10-06 01:12:42 +01:00
"last_notification_time": None,
2023-10-06 02:00:12 +01:00
}
2023-10-14 21:40:36 +01:00
# Also, make sure that the objects to detect are in the list of objects_and_peoples
# If it isn't, print a warning
for obj in args.detect_object:
2023-10-14 21:40:36 +01:00
if obj not in objects_and_peoples:
2023-10-14 00:16:55 +01:00
print(
f"Warning: {obj} is not in the list of objects the model can detect!"
)
for box in r.boxes:
# Get the name of the object
class_id = r.names[box.cls[0].item()]
# Get the coordinates of the object
cords = box.xyxy[0].tolist()
cords = [round(x) for x in cords]
# Get the confidence
conf = round(box.conf[0].item(), 2)
# Print it out, adding a spacer between each object
2023-10-05 03:03:11 +01:00
# print("Object type:", class_id)
# print("Coordinates:", cords)
# print("Probability:", conf)
# print("---")
2023-10-06 02:00:12 +01:00
# Now do stuff (if conf > 0.5)
2023-10-14 00:16:55 +01:00
if conf < args.confidence_threshold or (
class_id not in args.detect_object and args.detect_object != []
):
# If the confidence is too low
# or if the object is not in the list of objects to detect and the list of objects to detect is not empty
# then skip this iteration
2023-10-06 02:00:12 +01:00
continue
# Add the object to the list of objects to plot
plot_boxes.append(
{
"label": class_id,
"x1": cords[0],
"y1": cords[1],
"x2": cords[2],
"y2": cords[3],
}
)
2023-10-15 01:25:27 +01:00
objects_and_peoples = notify.thing_detected(
2023-10-14 21:40:36 +01:00
thing_name=class_id,
objects_and_peoples=objects_and_peoples,
detection_type="objects",
detection_window=args.detection_window,
detection_duration=args.detection_duration,
notification_window=args.notification_window,
ntfy_url=args.ntfy_url,
)
2023-10-05 03:03:11 +01:00
2023-10-15 01:25:27 +01:00
# To debug plotting, use r.plot() to cross reference the bounding boxes drawn by the plot_label() and r.plot()
frame_to_show = utils.plot_label(
boxes=plot_boxes,
full_frame=frame,
# full_frame=r.plot(),
run_scale=args.run_scale,
view_scale=args.view_scale,
)
2023-10-14 00:16:55 +01:00
# Display the resulting frame
# cv2.imshow("", r)
if not args.no_display:
cv2.imshow(f"Video{i}", frame_to_show)
2023-10-02 01:56:40 +01:00
# Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord("q"):
break
# Release handle to the webcam
print("Releasing video capture")
video_capture.release()
cv2.destroyAllWindows()
2023-10-15 01:25:27 +01:00
2023-10-14 21:46:42 +01:00
if __name__ == "__main__":
main()