wyzely-detect/set_detect_notify/utils/notify.py

138 lines
5.5 KiB
Python
Raw Normal View History

2023-10-14 21:46:42 +01:00
import httpx
import time
2023-10-15 01:25:27 +01:00
"""
2023-10-14 21:46:42 +01:00
Structure of objects_and_peoples
Really, the only reason peoples is a separate dictionary is to prevent duplicates, though it just makes the code more complicated.
{
"objects": {
"object_name": {
"last_detection_time": float,
"detection_duration": float,
"last_notification_time": float,
},
},
"peoples": {
"person_name": {
"last_detection_time": float,
"detection_duration": float,
"last_notification_time": float,
},
},
}
2023-10-15 01:25:27 +01:00
"""
2023-10-14 21:46:42 +01:00
# objects_and_peoples = {}
def thing_detected(
2023-10-15 01:25:27 +01:00
thing_name: str,
objects_and_peoples: dict,
detection_type: str = "objects",
detection_window: int = 15,
detection_duration: int = 2,
notification_window: int = 15,
ntfy_url: str = "https://ntfy.sh/set-detect-notify",
) -> dict:
"""
2023-10-14 21:46:42 +01:00
A function to make sure 2 seconds of detection is detected in 15 seconds, 15 seconds apart.
Takes a dict that will be retured with the updated detection times. MAKE SURE TO SAVE THE RETURNED DICTIONARY
2023-10-15 01:25:27 +01:00
"""
2023-10-14 21:46:42 +01:00
# "Alias" the objects and peoples dictionaries so it's easier to work with
respective_type = objects_and_peoples[detection_type]
# (re)start cycle
try:
2023-10-14 21:46:42 +01:00
if (
# If the object has not been detected before
respective_type[thing_name]["last_detection_time"] is None
# If the last detection was more than 15 seconds ago
or time.time() - respective_type[thing_name]["last_detection_time"]
> detection_window
2023-10-14 21:46:42 +01:00
):
# Set the last detection time to now
respective_type[thing_name]["last_detection_time"] = time.time()
print(f"First detection of {thing_name} in this detection window")
# This line is important. It resets the detection duration when the object hasn't been detected for a while
# If detection duration is None, don't print anything.
# Otherwise, print that the detection duration is being reset due to inactivity
if respective_type[thing_name]["detection_duration"] is not None:
print(
f"Resetting detection duration for {thing_name} since it hasn't been detected for {detection_window} seconds" # noqa: E501
)
2023-10-14 21:46:42 +01:00
respective_type[thing_name]["detection_duration"] = 0
else:
# Check if the last NOTIFICATION was less than 15 seconds ago
# If it was, then don't do anything
if (
time.time() - respective_type[thing_name]["last_detection_time"]
<= notification_window
):
pass
# If it was more than 15 seconds ago, reset the detection duration
# This effectively resets the notification timer
else:
print("Notification timer has expired - resetting")
respective_type[thing_name]["detection_duration"] = 0
respective_type[thing_name]["detection_duration"] += (
time.time() - respective_type[thing_name]["last_detection_time"]
)
# print("Updating detection duration")
respective_type[thing_name]["last_detection_time"] = time.time()
except KeyError:
# If the object has not been detected before
respective_type[thing_name] = {
"last_detection_time": time.time(),
"detection_duration": 0,
"last_notification_time": None,
}
print(f"First detection of {thing_name} ever")
2023-10-14 21:46:42 +01:00
# (re)send notification
# Check if detection has been ongoing for 2 seconds or more in the past 15 seconds
if (
2023-10-15 01:25:27 +01:00
respective_type[thing_name]["detection_duration"] >= detection_duration
2023-10-14 21:46:42 +01:00
and time.time() - respective_type[thing_name]["last_detection_time"]
<= detection_window
):
# If the last notification was more than 15 seconds ago, then send a notification
if (
respective_type[thing_name]["last_notification_time"] is None
2023-10-15 01:25:27 +01:00
or time.time() - respective_type[thing_name]["last_notification_time"]
2023-10-14 21:46:42 +01:00
> notification_window
):
respective_type[thing_name]["last_notification_time"] = time.time()
2023-10-15 01:25:27 +01:00
print(f"Detected {thing_name} for {detection_duration} seconds")
2023-10-14 21:46:42 +01:00
headers = construct_ntfy_headers(
title=f"{thing_name} detected",
tag="rotating_light",
priority="default",
)
send_notification(
data=f"{thing_name} detected for {detection_duration} seconds",
headers=headers,
url=ntfy_url,
)
# Reset the detection duration
print("Just sent a notification - resetting detection duration")
respective_type[thing_name]["detection_duration"] = 0
# Take the aliased objects_and_peoples and update the respective dictionary
objects_and_peoples[detection_type] = respective_type
return objects_and_peoples
def construct_ntfy_headers(
title: str = "Object/Person Detected",
tag="rotating_light", # https://docs.ntfy.sh/publish/#tags-emojis
priority="default", # https://docs.ntfy.sh/publish/#message-priority
) -> dict:
return {"Title": title, "Priority": priority, "Tags": tag}
def send_notification(data: str, headers: dict, url: str):
if url is None or data is None:
raise ValueError("url and data cannot be None")
httpx.post(url, data=data.encode("utf-8"), headers=headers)