Added notification stuff

This commit is contained in:
slashtechno 2023-10-04 21:03:11 -05:00
parent dbf1b0ed63
commit a99a899417
Signed by: slashtechno
GPG Key ID: 8EC1D9D9286C2B17
4 changed files with 127 additions and 54 deletions

15
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Module",
"type": "python",
"request": "launch",
"module": "src",
"justMyCode": true
}
]
}

View File

@ -5,6 +5,7 @@ import dotenv
from pathlib import Path
import os
import time
# import hjson as json
import torch
from ultralytics import YOLO
@ -16,7 +17,11 @@ from .utils import notify, config_utils
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
args = None
object_names = {}
def main():
global object_names
global args
# RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") # Replace this with code to check for gpu
@ -32,14 +37,16 @@ def main():
epilog=":)",
)
# required='RUN_SCALE' not in os.environ,
# required='RUN_SCALE' not in os.environ,
argparser.add_argument(
'--run-scale',
# Set it to the env RUN_SCALE if it isn't blank, otherwise set it to 0.25
default=os.environ['RUN_SCALE'] if 'RUN_SCALE' in os.environ and os.environ['RUN_SCALE'] != '' else 0.25, # noqa: E501
type=float,
help="The scale to run the detection at, default is 0.25",
"--run-scale",
# Set it to the env RUN_SCALE if it isn't blank, otherwise set it to 0.25
default=os.environ["RUN_SCALE"]
if "RUN_SCALE" in os.environ and os.environ["RUN_SCALE"] != ""
else 0.25, # noqa: E501
type=float,
help="The scale to run the detection at, default is 0.25",
)
# argparser.add_argument(
# '--view-scale',
@ -49,7 +56,7 @@ def main():
# help="The scale to view the detection at, default is 0.75",
# )
stream_source = argparser.add_mutually_exclusive_group()
stream_source = argparser.add_mutually_exclusive_group()
# stream_source.add_argument(
# '--url',
# default=os.environ['URL'] if 'URL' in os.environ and os.environ['URL'] != '' else None, # noqa: E501
@ -57,16 +64,20 @@ def main():
# help="The URL of the stream to use",
# )
stream_source.add_argument(
'--capture-device',
default=os.environ['CAPTURE_DEVICE'] if 'CAPTURE_DEVICE' in os.environ and os.environ['CAPTURE_DEVICE'] != '' else 0, # noqa: E501
"--capture-device",
default=os.environ["CAPTURE_DEVICE"]
if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != ""
else 0, # noqa: E501
type=int,
help="The capture device to use. Can also be a url."
help="The capture device to use. Can also be a url.",
)
notifcation_services = argparser.add_argument_group("Notification Services")
notifcation_services.add_argument(
'--ntfy-url',
default=os.environ['NTFY_URL'] if 'NTFY_URL' in os.environ and os.environ['NTFY_URL'] != '' else None, # noqa: E501
"--ntfy-url",
default=os.environ["NTFY_URL"]
if "NTFY_URL" in os.environ and os.environ["NTFY_URL"] != ""
else "https://ntfy.sh/set-detect-notify",
type=str,
help="The URL to send notifications to",
)
@ -75,25 +86,24 @@ def main():
# Check if a CUDA GPU is available. If it is, set it via torch. Ff not, set it to cpu
# https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168
# Currently, I have been unable to set up Poetry to use GPU for Torch
# Currently, I have been unable to set up Poetry to use GPU for Torch
for i in range(torch.cuda.device_count()):
print(torch.cuda.get_device_properties(i).name)
if torch.cuda.is_available():
torch.cuda.set_device(0)
print("Set CUDA device")
else:
else:
print("No CUDA device available, using CPU")
model = YOLO("yolov8n.pt")
# video_capture = cv2.VideoCapture(args.capture_device)
video_capture = cv2.VideoCapture(args.capture_device)
video_capture = cv2.VideoCapture("rtsp://192.168.1.7:8554/cv")
# Eliminate lag by setting the buffer size to 1
# This makes it so that the video capture will only grab the most recent frame
# However, this means that the video may be choppy
video_capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# Print the resolution of the video
print(
f"Video resolution: {video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)}x{video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)}" # noqa: E501
@ -107,9 +117,17 @@ def main():
# Resize frame of video to a smaller size for faster recognition processing
run_frame = cv2.resize(frame, (0, 0), fx=args.run_scale, fy=args.run_scale)
# view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale)
results = model(run_frame, verbose=False)
for r in results:
# Setup dictionary of object names
if not object_names:
for name in r.names.values():
object_names[name] = {
"last_detection_time": None,
"detection_duration": None,
# "first_detection_time": None,
}
for box in r.boxes:
# Get the name of the object
class_id = r.names[box.cls[0].item()]
@ -119,10 +137,57 @@ def main():
# Get the confidence
conf = round(box.conf[0].item(), 2)
# Print it out, adding a spacer between each object
print("Object type:", class_id)
print("Coordinates:", cords)
print("Probability:", conf)
print("---")
# print("Object type:", class_id)
# print("Coordinates:", cords)
# print("Probability:", conf)
# print("---")
# Now do stuff
# If this is the first time the object has been detected
# or if it has been more than 15 seconds since the last detection
# reset the detection duration
if (
object_names[class_id]["last_detection_time"] is None
or time.time() - object_names[class_id]["last_detection_time"] > 15
or object_names[class_id]["detection_duration"] is None
):
print(f"First detection of {class_id}")
# time.time() returns the number of seconds since the epoch
object_names[class_id]["last_detection_time"] = time.time()
# object_names[class_id]["first_detection_time"] = time.time()
object_names[class_id]["detection_duration"] = 0
headers = notify.construct_ntfy_headers(
title=f"{class_id} Detected",
tag="rotating_light",
priority="default",
)
notify.send_notification(
data=f"{class_id} Detected", headers=headers, url=args.ntfy_url
)
else:
# Add the time since the last detection to the total detection duration
object_names[class_id]["detection_duration"] += (
time.time() - object_names[class_id]["last_detection_time"]
)
# Check if detection has been ongoing for 2 seconds or more in the past 15 seconds
if (
object_names[class_id]["detection_duration"] >= 2
and time.time() - object_names[class_id]["last_detection_time"]
<= 15
):
print(f"Detected {class_id} for 2 seconds")
headers = notify.construct_ntfy_headers(
title=f"{class_id} Detected",
tag="rotating_light",
priority="default",
)
notify.send_notification(
data=f"{class_id} Detected", headers=headers, url=args.ntfy_url
)
im_array = r.plot()
# Scale back up the coordinates of the locations of detected objects.
# im_array = np.multiply(im_array, 1/args.run_scale)
@ -130,7 +195,6 @@ def main():
# print(im_array)
# exit()
cv2.imshow("View", im_array)
# Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord("q"):
@ -141,4 +205,5 @@ def main():
video_capture.release()
cv2.destroyAllWindows()
main()
main()

View File

@ -1,4 +1,3 @@
# def write_config():
# with open(config_path, "w") as config_file:
# json.dump(config, config_file, indent=4)

View File

@ -3,36 +3,30 @@ import httpx
def construct_ntfy_headers(
title: str = "Object/Person Detected",
tag = "rotating_light", # https://docs.ntfy.sh/publish/#tags-emojis
priority = "default", # https://docs.ntfy.sh/publish/#message-priority
) -> (dict):
return {
'Title': title,
'Priority': priority,
'Tags': tag
}
title: str = "Object/Person Detected",
tag="rotating_light", # https://docs.ntfy.sh/publish/#tags-emojis
priority="default", # https://docs.ntfy.sh/publish/#message-priority
) -> dict:
return {"Title": title, "Priority": priority, "Tags": tag}
def send_notification(
data: str,
headers: dict,
url: str
):
def send_notification(data: str, headers: dict, url: str):
if url is None or data is None:
raise ValueError("url and data cannot be None")
httpx.post(url, data=data.encode('utf-8'), headers=headers)
raise ValueError("url and data cannot be None")
httpx.post(url, data=data.encode("utf-8"), headers=headers)
def check_last_seen(last_seen: datetime.datetime, seconds: int = 15):
'''
Check if a time is older than a given number of seconds
If it is, return True
If last_seen is empty/null, return True
'''
if (
datetime.datetime.now() - last_seen > datetime.timedelta(seconds=seconds)
or last_seen == ""
or last_seen is None
):
return True
else:
return False
"""
Check if a time is older than a given number of seconds
If it is, return True
If last_seen is empty/null, return True
"""
if (
datetime.datetime.now() - last_seen > datetime.timedelta(seconds=seconds)
or last_seen == ""
or last_seen is None
):
return True
else:
return False