From a99a899417a9701a8feae534e8daac1435b7d77c Mon Sep 17 00:00:00 2001 From: slashtechno <77907286+slashtechno@users.noreply.github.com> Date: Wed, 4 Oct 2023 21:03:11 -0500 Subject: [PATCH] Added notification stuff --- .vscode/launch.json | 15 +++++ src/__main__.py | 113 ++++++++++++++++++++++++++++++-------- src/utils/config_utils.py | 1 - src/utils/notify.py | 52 ++++++++---------- 4 files changed, 127 insertions(+), 54 deletions(-) create mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..1bf278f --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Module", + "type": "python", + "request": "launch", + "module": "src", + "justMyCode": true + } + ] +} \ No newline at end of file diff --git a/src/__main__.py b/src/__main__.py index 1eda560..1770933 100644 --- a/src/__main__.py +++ b/src/__main__.py @@ -5,6 +5,7 @@ import dotenv from pathlib import Path import os import time + # import hjson as json import torch from ultralytics import YOLO @@ -16,7 +17,11 @@ from .utils import notify, config_utils DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" args = None +object_names = {} + + def main(): + global object_names global args # RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") # Replace this with code to check for gpu @@ -32,14 +37,16 @@ def main(): epilog=":)", ) - # required='RUN_SCALE' not in os.environ, + # required='RUN_SCALE' not in os.environ, argparser.add_argument( - '--run-scale', - # Set it to the env RUN_SCALE if it isn't blank, otherwise set it to 0.25 - default=os.environ['RUN_SCALE'] if 'RUN_SCALE' in os.environ and os.environ['RUN_SCALE'] != '' else 0.25, # noqa: E501 - type=float, - help="The scale to run the detection at, default is 0.25", + "--run-scale", + # Set it to the env RUN_SCALE if it isn't blank, otherwise set it to 0.25 + default=os.environ["RUN_SCALE"] + if "RUN_SCALE" in os.environ and os.environ["RUN_SCALE"] != "" + else 0.25, # noqa: E501 + type=float, + help="The scale to run the detection at, default is 0.25", ) # argparser.add_argument( # '--view-scale', @@ -49,7 +56,7 @@ def main(): # help="The scale to view the detection at, default is 0.75", # ) - stream_source = argparser.add_mutually_exclusive_group() + stream_source = argparser.add_mutually_exclusive_group() # stream_source.add_argument( # '--url', # default=os.environ['URL'] if 'URL' in os.environ and os.environ['URL'] != '' else None, # noqa: E501 @@ -57,16 +64,20 @@ def main(): # help="The URL of the stream to use", # ) stream_source.add_argument( - '--capture-device', - default=os.environ['CAPTURE_DEVICE'] if 'CAPTURE_DEVICE' in os.environ and os.environ['CAPTURE_DEVICE'] != '' else 0, # noqa: E501 + "--capture-device", + default=os.environ["CAPTURE_DEVICE"] + if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != "" + else 0, # noqa: E501 type=int, - help="The capture device to use. Can also be a url." + help="The capture device to use. Can also be a url.", ) notifcation_services = argparser.add_argument_group("Notification Services") notifcation_services.add_argument( - '--ntfy-url', - default=os.environ['NTFY_URL'] if 'NTFY_URL' in os.environ and os.environ['NTFY_URL'] != '' else None, # noqa: E501 + "--ntfy-url", + default=os.environ["NTFY_URL"] + if "NTFY_URL" in os.environ and os.environ["NTFY_URL"] != "" + else "https://ntfy.sh/set-detect-notify", type=str, help="The URL to send notifications to", ) @@ -75,25 +86,24 @@ def main(): # Check if a CUDA GPU is available. If it is, set it via torch. Ff not, set it to cpu # https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168 - # Currently, I have been unable to set up Poetry to use GPU for Torch + # Currently, I have been unable to set up Poetry to use GPU for Torch for i in range(torch.cuda.device_count()): print(torch.cuda.get_device_properties(i).name) if torch.cuda.is_available(): torch.cuda.set_device(0) print("Set CUDA device") - else: + else: print("No CUDA device available, using CPU") - + model = YOLO("yolov8n.pt") # video_capture = cv2.VideoCapture(args.capture_device) - video_capture = cv2.VideoCapture(args.capture_device) + video_capture = cv2.VideoCapture("rtsp://192.168.1.7:8554/cv") # Eliminate lag by setting the buffer size to 1 # This makes it so that the video capture will only grab the most recent frame # However, this means that the video may be choppy video_capture.set(cv2.CAP_PROP_BUFFERSIZE, 1) - # Print the resolution of the video print( f"Video resolution: {video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)}x{video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)}" # noqa: E501 @@ -107,9 +117,17 @@ def main(): # Resize frame of video to a smaller size for faster recognition processing run_frame = cv2.resize(frame, (0, 0), fx=args.run_scale, fy=args.run_scale) # view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale) - + results = model(run_frame, verbose=False) for r in results: + # Setup dictionary of object names + if not object_names: + for name in r.names.values(): + object_names[name] = { + "last_detection_time": None, + "detection_duration": None, + # "first_detection_time": None, + } for box in r.boxes: # Get the name of the object class_id = r.names[box.cls[0].item()] @@ -119,10 +137,57 @@ def main(): # Get the confidence conf = round(box.conf[0].item(), 2) # Print it out, adding a spacer between each object - print("Object type:", class_id) - print("Coordinates:", cords) - print("Probability:", conf) - print("---") + # print("Object type:", class_id) + # print("Coordinates:", cords) + # print("Probability:", conf) + # print("---") + + # Now do stuff + + # If this is the first time the object has been detected + # or if it has been more than 15 seconds since the last detection + # reset the detection duration + + if ( + object_names[class_id]["last_detection_time"] is None + or time.time() - object_names[class_id]["last_detection_time"] > 15 + or object_names[class_id]["detection_duration"] is None + ): + print(f"First detection of {class_id}") + # time.time() returns the number of seconds since the epoch + object_names[class_id]["last_detection_time"] = time.time() + # object_names[class_id]["first_detection_time"] = time.time() + object_names[class_id]["detection_duration"] = 0 + headers = notify.construct_ntfy_headers( + title=f"{class_id} Detected", + tag="rotating_light", + priority="default", + ) + notify.send_notification( + data=f"{class_id} Detected", headers=headers, url=args.ntfy_url + ) + else: + # Add the time since the last detection to the total detection duration + object_names[class_id]["detection_duration"] += ( + time.time() - object_names[class_id]["last_detection_time"] + ) + + + # Check if detection has been ongoing for 2 seconds or more in the past 15 seconds + if ( + object_names[class_id]["detection_duration"] >= 2 + and time.time() - object_names[class_id]["last_detection_time"] + <= 15 + ): + print(f"Detected {class_id} for 2 seconds") + headers = notify.construct_ntfy_headers( + title=f"{class_id} Detected", + tag="rotating_light", + priority="default", + ) + notify.send_notification( + data=f"{class_id} Detected", headers=headers, url=args.ntfy_url + ) im_array = r.plot() # Scale back up the coordinates of the locations of detected objects. # im_array = np.multiply(im_array, 1/args.run_scale) @@ -130,7 +195,6 @@ def main(): # print(im_array) # exit() cv2.imshow("View", im_array) - # Hit 'q' on the keyboard to quit! if cv2.waitKey(1) & 0xFF == ord("q"): @@ -141,4 +205,5 @@ def main(): video_capture.release() cv2.destroyAllWindows() -main() \ No newline at end of file + +main() diff --git a/src/utils/config_utils.py b/src/utils/config_utils.py index 44f0f92..a0afe46 100644 --- a/src/utils/config_utils.py +++ b/src/utils/config_utils.py @@ -1,4 +1,3 @@ - # def write_config(): # with open(config_path, "w") as config_file: # json.dump(config, config_file, indent=4) diff --git a/src/utils/notify.py b/src/utils/notify.py index 6447d66..a4ba64e 100644 --- a/src/utils/notify.py +++ b/src/utils/notify.py @@ -3,36 +3,30 @@ import httpx def construct_ntfy_headers( - title: str = "Object/Person Detected", - tag = "rotating_light", # https://docs.ntfy.sh/publish/#tags-emojis - priority = "default", # https://docs.ntfy.sh/publish/#message-priority -) -> (dict): - return { - 'Title': title, - 'Priority': priority, - 'Tags': tag - } + title: str = "Object/Person Detected", + tag="rotating_light", # https://docs.ntfy.sh/publish/#tags-emojis + priority="default", # https://docs.ntfy.sh/publish/#message-priority +) -> dict: + return {"Title": title, "Priority": priority, "Tags": tag} -def send_notification( - data: str, - headers: dict, - url: str -): + +def send_notification(data: str, headers: dict, url: str): if url is None or data is None: - raise ValueError("url and data cannot be None") - httpx.post(url, data=data.encode('utf-8'), headers=headers) + raise ValueError("url and data cannot be None") + httpx.post(url, data=data.encode("utf-8"), headers=headers) + def check_last_seen(last_seen: datetime.datetime, seconds: int = 15): - ''' - Check if a time is older than a given number of seconds - If it is, return True - If last_seen is empty/null, return True - ''' - if ( - datetime.datetime.now() - last_seen > datetime.timedelta(seconds=seconds) - or last_seen == "" - or last_seen is None - ): - return True - else: - return False \ No newline at end of file + """ + Check if a time is older than a given number of seconds + If it is, return True + If last_seen is empty/null, return True + """ + if ( + datetime.datetime.now() - last_seen > datetime.timedelta(seconds=seconds) + or last_seen == "" + or last_seen is None + ): + return True + else: + return False