Allow program to be run as a script

This commit is contained in:
slashtechno 2023-10-14 15:46:42 -05:00
parent 3bf1966bfd
commit b6948aded2
Signed by: slashtechno
GPG Key ID: 8EC1D9D9286C2B17
6 changed files with 141 additions and 138 deletions

View File

@ -1,11 +1,11 @@
[tool.poetry] [tool.poetry]
name = "set-detect-notify" name = "set_detect_notify"
version = "0.1.0" version = "0.1.0"
description = "Detect all the things" description = "Detect all the things"
authors = ["slashtechno <77907286+slashtechno@users.noreply.github.com>"] authors = ["slashtechno <77907286+slashtechno@users.noreply.github.com>"]
license = "MIT" license = "MIT"
readme = "README.md" readme = "README.md"
packages = [{include = "set-detect-notify"}] packages = [{include = "set_detect_notify"}]
[tool.poetry.dependencies] [tool.poetry.dependencies]
# python = "^3.10" # python = "^3.10"
@ -41,3 +41,6 @@ build-backend = "poetry.core.masonry.api"
# Where possible, `black` will attempt to format to 88 characters # Where possible, `black` will attempt to format to 88 characters
# However, setting ruff to 135 will allow for longer lines that can't be auto-formatted # However, setting ruff to 135 will allow for longer lines that can't be auto-formatted
line-length = 135 line-length = 135
[tool.poetry.scripts]
set-detect-notify = "set_detect_notify.__main__:main"

View File

@ -275,5 +275,5 @@ def main():
video_capture.release() video_capture.release()
cv2.destroyAllWindows() cv2.destroyAllWindows()
if __name__ == "__main__":
main() main()

View File

@ -1,134 +1,134 @@
import httpx import httpx
import time import time
''' '''
Structure of objects_and_peoples Structure of objects_and_peoples
Really, the only reason peoples is a separate dictionary is to prevent duplicates, though it just makes the code more complicated. Really, the only reason peoples is a separate dictionary is to prevent duplicates, though it just makes the code more complicated.
TODO: Make a function to check if a person is in the objects dictionary and vice versa TODO: Make a function to check if a person is in the objects dictionary and vice versa
{ {
"objects": { "objects": {
"object_name": { "object_name": {
"last_detection_time": float, "last_detection_time": float,
"detection_duration": float, "detection_duration": float,
"last_notification_time": float, "last_notification_time": float,
}, },
}, },
"peoples": { "peoples": {
"person_name": { "person_name": {
"last_detection_time": float, "last_detection_time": float,
"detection_duration": float, "detection_duration": float,
"last_notification_time": float, "last_notification_time": float,
}, },
}, },
} }
''' '''
# objects_and_peoples = {} # objects_and_peoples = {}
def thing_detected( def thing_detected(
thing_name: str, thing_name: str,
objects_and_peoples: dict, objects_and_peoples: dict,
detection_type: str = "objects", detection_type: str = "objects",
detection_window: int = 15, detection_window: int = 15,
detection_duration: int = 2, detection_duration: int = 2,
notification_window: int = 15, notification_window: int = 15,
ntfy_url: str = "https://ntfy.sh/set-detect-notify" ntfy_url: str = "https://ntfy.sh/set-detect-notify"
) -> dict: ) -> dict:
''' '''
A function to make sure 2 seconds of detection is detected in 15 seconds, 15 seconds apart. A function to make sure 2 seconds of detection is detected in 15 seconds, 15 seconds apart.
Takes a dict that will be retured with the updated detection times. MAKE SURE TO SAVE THE RETURNED DICTIONARY Takes a dict that will be retured with the updated detection times. MAKE SURE TO SAVE THE RETURNED DICTIONARY
''' '''
# "Alias" the objects and peoples dictionaries so it's easier to work with # "Alias" the objects and peoples dictionaries so it's easier to work with
respective_type = objects_and_peoples[detection_type] respective_type = objects_and_peoples[detection_type]
# (re)start cycle # (re)start cycle
if ( if (
# If the object has not been detected before # If the object has not been detected before
respective_type[thing_name]["last_detection_time"] is None respective_type[thing_name]["last_detection_time"] is None
# If the last detection was more than 15 seconds ago # If the last detection was more than 15 seconds ago
or time.time() - respective_type[thing_name]["last_detection_time"] or time.time() - respective_type[thing_name]["last_detection_time"]
> detection_window > detection_window
): ):
# Set the last detection time to now # Set the last detection time to now
respective_type[thing_name]["last_detection_time"] = time.time() respective_type[thing_name]["last_detection_time"] = time.time()
print(f"First detection of {thing_name} in this detection window") print(f"First detection of {thing_name} in this detection window")
# This line is important. It resets the detection duration when the object hasn't been detected for a while # This line is important. It resets the detection duration when the object hasn't been detected for a while
# If detection duration is None, don't print anything. # If detection duration is None, don't print anything.
# Otherwise, print that the detection duration is being reset due to inactivity # Otherwise, print that the detection duration is being reset due to inactivity
if respective_type[thing_name]["detection_duration"] is not None: if respective_type[thing_name]["detection_duration"] is not None:
print( print(
f"Resetting detection duration for {thing_name} since it hasn't been detected for {detection_window} seconds" # noqa: E501 f"Resetting detection duration for {thing_name} since it hasn't been detected for {detection_window} seconds" # noqa: E501
) )
respective_type[thing_name]["detection_duration"] = 0 respective_type[thing_name]["detection_duration"] = 0
else: else:
# Check if the last notification was less than 15 seconds ago # Check if the last notification was less than 15 seconds ago
# If it was, then don't do anything # If it was, then don't do anything
if ( if (
time.time() - respective_type[thing_name]["last_detection_time"] time.time() - respective_type[thing_name]["last_detection_time"]
<= notification_window <= notification_window
): ):
pass pass
# If it was more than 15 seconds ago, reset the detection duration # If it was more than 15 seconds ago, reset the detection duration
# This effectively resets the notification timer # This effectively resets the notification timer
else: else:
print("Notification timer has expired - resetting") print("Notification timer has expired - resetting")
respective_type[thing_name]["detection_duration"] = 0 respective_type[thing_name]["detection_duration"] = 0
respective_type[thing_name]["detection_duration"] += ( respective_type[thing_name]["detection_duration"] += (
time.time() - respective_type[thing_name]["last_detection_time"] time.time() - respective_type[thing_name]["last_detection_time"]
) )
# print("Updating detection duration") # print("Updating detection duration")
respective_type[thing_name]["last_detection_time"] = time.time() respective_type[thing_name]["last_detection_time"] = time.time()
# (re)send notification # (re)send notification
# Check if detection has been ongoing for 2 seconds or more in the past 15 seconds # Check if detection has been ongoing for 2 seconds or more in the past 15 seconds
if ( if (
respective_type[thing_name]["detection_duration"] respective_type[thing_name]["detection_duration"]
>= detection_duration >= detection_duration
and time.time() - respective_type[thing_name]["last_detection_time"] and time.time() - respective_type[thing_name]["last_detection_time"]
<= detection_window <= detection_window
): ):
# If the last notification was more than 15 seconds ago, then send a notification # If the last notification was more than 15 seconds ago, then send a notification
if ( if (
respective_type[thing_name]["last_notification_time"] is None respective_type[thing_name]["last_notification_time"] is None
or time.time() or time.time()
- respective_type[thing_name]["last_notification_time"] - respective_type[thing_name]["last_notification_time"]
> notification_window > notification_window
): ):
respective_type[thing_name]["last_notification_time"] = time.time() respective_type[thing_name]["last_notification_time"] = time.time()
print( print(
f"Detected {thing_name} for {detection_duration} seconds" f"Detected {thing_name} for {detection_duration} seconds"
) )
headers = construct_ntfy_headers( headers = construct_ntfy_headers(
title=f"{thing_name} detected", title=f"{thing_name} detected",
tag="rotating_light", tag="rotating_light",
priority="default", priority="default",
) )
send_notification( send_notification(
data=f"{thing_name} detected for {detection_duration} seconds", data=f"{thing_name} detected for {detection_duration} seconds",
headers=headers, headers=headers,
url=ntfy_url, url=ntfy_url,
) )
# Reset the detection duration # Reset the detection duration
print("Just sent a notification - resetting detection duration") print("Just sent a notification - resetting detection duration")
respective_type[thing_name]["detection_duration"] = 0 respective_type[thing_name]["detection_duration"] = 0
# Take the aliased objects_and_peoples and update the respective dictionary # Take the aliased objects_and_peoples and update the respective dictionary
objects_and_peoples[detection_type] = respective_type objects_and_peoples[detection_type] = respective_type
return objects_and_peoples return objects_and_peoples
def construct_ntfy_headers( def construct_ntfy_headers(
title: str = "Object/Person Detected", title: str = "Object/Person Detected",
tag="rotating_light", # https://docs.ntfy.sh/publish/#tags-emojis tag="rotating_light", # https://docs.ntfy.sh/publish/#tags-emojis
priority="default", # https://docs.ntfy.sh/publish/#message-priority priority="default", # https://docs.ntfy.sh/publish/#message-priority
) -> dict: ) -> dict:
return {"Title": title, "Priority": priority, "Tags": tag} return {"Title": title, "Priority": priority, "Tags": tag}
def send_notification(data: str, headers: dict, url: str): def send_notification(data: str, headers: dict, url: str):
if url is None or data is None: if url is None or data is None:
raise ValueError("url and data cannot be None") raise ValueError("url and data cannot be None")
httpx.post(url, data=data.encode("utf-8"), headers=headers) httpx.post(url, data=data.encode("utf-8"), headers=headers)