wyzely-detect/set-detect-notify/__main__.py

221 lines
8.5 KiB
Python
Raw Normal View History

2023-10-02 01:56:40 +01:00
# import face_recognition
import cv2
import numpy as np
import dotenv
from pathlib import Path
import os
import time
2023-10-05 03:03:11 +01:00
2023-10-02 01:56:40 +01:00
# import hjson as json
import torch
from ultralytics import YOLO
import argparse
from .utils import notify, config_utils
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
args = None
2023-10-05 03:03:11 +01:00
object_names = {}
2023-10-02 01:56:40 +01:00
def main():
2023-10-05 03:03:11 +01:00
global object_names
2023-10-02 01:56:40 +01:00
global args
# RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") # Replace this with code to check for gpu
if Path(".env").is_file():
dotenv.load_dotenv()
print("Loaded .env file")
else:
print("No .env file found")
argparser = argparse.ArgumentParser(
prog="Detect It",
description="Detect it all!",
epilog=":)",
)
2023-10-05 03:03:11 +01:00
# required='RUN_SCALE' not in os.environ,
2023-10-02 01:56:40 +01:00
argparser.add_argument(
2023-10-05 03:03:11 +01:00
"--run-scale",
# Set it to the env RUN_SCALE if it isn't blank, otherwise set it to 0.25
default=os.environ["RUN_SCALE"]
if "RUN_SCALE" in os.environ and os.environ["RUN_SCALE"] != ""
else 0.25, # noqa: E501
type=float,
help="The scale to run the detection at, default is 0.25",
2023-10-02 01:56:40 +01:00
)
# argparser.add_argument(
# '--view-scale',
# # Set it to the env VIEW_SCALE if it isn't blank, otherwise set it to 0.75
# default=os.environ['VIEW_SCALE'] if 'VIEW_SCALE' in os.environ and os.environ['VIEW_SCALE'] != '' else 0.75, # noqa: E501
# type=float,
# help="The scale to view the detection at, default is 0.75",
# )
2023-10-05 03:03:11 +01:00
stream_source = argparser.add_mutually_exclusive_group()
2023-10-06 00:40:53 +01:00
stream_source.add_argument(
'--url',
default=os.environ['URL'] if 'URL' in os.environ and os.environ['URL'] != '' else None, # noqa: E501
type=str,
help="The URL of the stream to use",
)
2023-10-02 01:56:40 +01:00
stream_source.add_argument(
2023-10-05 03:03:11 +01:00
"--capture-device",
default=os.environ["CAPTURE_DEVICE"]
if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != ""
else 0, # noqa: E501
2023-10-02 01:56:40 +01:00
type=int,
2023-10-05 03:03:11 +01:00
help="The capture device to use. Can also be a url.",
2023-10-02 01:56:40 +01:00
)
notifcation_services = argparser.add_argument_group("Notification Services")
notifcation_services.add_argument(
2023-10-05 03:03:11 +01:00
"--ntfy-url",
default=os.environ["NTFY_URL"]
if "NTFY_URL" in os.environ and os.environ["NTFY_URL"] != ""
else "https://ntfy.sh/set-detect-notify",
2023-10-02 01:56:40 +01:00
type=str,
help="The URL to send notifications to",
)
args = argparser.parse_args()
# Check if a CUDA GPU is available. If it is, set it via torch. Ff not, set it to cpu
# https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168
2023-10-05 03:03:11 +01:00
# Currently, I have been unable to set up Poetry to use GPU for Torch
for i in range(torch.cuda.device_count()):
print(torch.cuda.get_device_properties(i).name)
if torch.cuda.is_available():
2023-10-02 01:56:40 +01:00
torch.cuda.set_device(0)
print("Set CUDA device")
2023-10-05 03:03:11 +01:00
else:
2023-10-02 01:56:40 +01:00
print("No CUDA device available, using CPU")
2023-10-05 03:03:11 +01:00
2023-10-02 01:56:40 +01:00
model = YOLO("yolov8n.pt")
2023-10-06 00:40:53 +01:00
# Depending on if the user wants to use a stream or a capture device,
# Set the video capture to the appropriate source
if args.url:
video_capture = cv2.VideoCapture(args.url)
else:
video_capture = cv2.VideoCapture(args.capture_device)
2023-10-02 01:56:40 +01:00
# Eliminate lag by setting the buffer size to 1
# This makes it so that the video capture will only grab the most recent frame
# However, this means that the video may be choppy
video_capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# Print the resolution of the video
print(
f"Video resolution: {video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)}x{video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)}" # noqa: E501
)
print("Beginning video capture...")
while True:
# Grab a single frame of video
ret, frame = video_capture.read()
# Only process every other frame of video to save time
# Resize frame of video to a smaller size for faster recognition processing
run_frame = cv2.resize(frame, (0, 0), fx=args.run_scale, fy=args.run_scale)
2023-10-06 00:40:53 +01:00
# view_frame = cv2.resize(frame, (0, 0), fx=`args.`view_scale, fy=args.view_scale)
2023-10-05 03:03:11 +01:00
results = model(run_frame, verbose=False)
2023-10-02 01:56:40 +01:00
for r in results:
2023-10-05 03:03:11 +01:00
# Setup dictionary of object names
if not object_names:
for name in r.names.values():
object_names[name] = {
"last_detection_time": None,
"detection_duration": None,
# "first_detection_time": None,
2023-10-06 01:12:42 +01:00
"last_notification_time": None,
2023-10-05 03:03:11 +01:00
}
for box in r.boxes:
# Get the name of the object
class_id = r.names[box.cls[0].item()]
# Get the coordinates of the object
cords = box.xyxy[0].tolist()
cords = [round(x) for x in cords]
# Get the confidence
conf = round(box.conf[0].item(), 2)
# Print it out, adding a spacer between each object
2023-10-05 03:03:11 +01:00
# print("Object type:", class_id)
# print("Coordinates:", cords)
# print("Probability:", conf)
# print("---")
# Now do stuff
# If this is the first time the object has been detected
# or if it has been more than 15 seconds since the last detection
# reset the detection duration
if (
object_names[class_id]["last_detection_time"] is None
2023-10-06 01:12:42 +01:00
or time.time() - object_names[class_id]["last_detection_time"] > 15
2023-10-05 03:03:11 +01:00
or object_names[class_id]["detection_duration"] is None
):
2023-10-06 01:12:42 +01:00
print(f"First detection of {class_id} in session")
2023-10-05 03:03:11 +01:00
# time.time() returns the number of seconds since the epoch
object_names[class_id]["last_detection_time"] = time.time()
# object_names[class_id]["first_detection_time"] = time.time()
object_names[class_id]["detection_duration"] = 0
headers = notify.construct_ntfy_headers(
title=f"{class_id} Detected",
tag="rotating_light",
priority="default",
)
notify.send_notification(
data=f"{class_id} Detected", headers=headers, url=args.ntfy_url
)
else:
# Add the time since the last detection to the total detection duration
object_names[class_id]["detection_duration"] += (
time.time() - object_names[class_id]["last_detection_time"]
)
# Check if detection has been ongoing for 2 seconds or more in the past 15 seconds
if (
object_names[class_id]["detection_duration"] >= 2
and time.time() - object_names[class_id]["last_detection_time"]
2023-10-06 01:12:42 +01:00
<= 15
2023-10-05 03:03:11 +01:00
):
2023-10-06 01:12:42 +01:00
# If the last notification was more than 15 seconds ago, then send a notification
if object_names[class_id]["last_notification_time"] is None or time.time() - object_names[class_id]["last_notification_time"] > 15:
object_names[class_id]["last_notification_time"] = time.time()
print(f"Detected {class_id} for 2 seconds")
headers = notify.construct_ntfy_headers(
title=f"{class_id} Detected",
tag="rotating_light",
priority="default",
)
notify.send_notification(
data=f"{class_id} Detected", headers=headers, url=args.ntfy_url
)
# Reset the detection duration
object_names[class_id]["detection_duration"] = 0
2023-10-02 01:56:40 +01:00
im_array = r.plot()
# Scale back up the coordinates of the locations of detected objects.
# im_array = np.multiply(im_array, 1/args.run_scale)
# print(type(im_array))
# print(im_array)
# exit()
cv2.imshow("View", im_array)
# Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord("q"):
break
# Release handle to the webcam
print("Releasing video capture")
video_capture.release()
cv2.destroyAllWindows()
2023-10-05 03:03:11 +01:00
main()