diff --git a/environment.yml b/environment.yml index c8725cc..207f2c7 100644 Binary files a/environment.yml and b/environment.yml differ diff --git a/main.py b/main.py index 3aa90a7..0f2b78f 100644 --- a/main.py +++ b/main.py @@ -19,6 +19,7 @@ DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" # VIEW_SCALE = 0.75 DISPLAY = False RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") +NTFY_URL = os.getenv("NTFY_URL") def find_face_from_name(name): @@ -46,14 +47,17 @@ process_this_frame = True # Load the config file, if it does not exist or is blank, create it config = { # If RUN_BY_COMPOSE is true, set url to rtsp://wyze-bridge:8554/wyze_cam_name, otherwise set it to "rtsp://localhost:8554/wyze_cam_name" - "URL": "rtsp://localhost:8554/wyze_cam_name" if not RUN_BY_COMPOSE else "rtsp://bridge:8554/wyze_cam_name", + "URL": "rtsp://localhost:8554/wyze_cam_name" + if not RUN_BY_COMPOSE + else "rtsp://bridge:8554/wyze_cam_name", "run_scale": "0.25", "view_scale": "0.75", "faces": { "example1": {"image": "config/example1.jpg", "last_seen": ""}, "example2": {"image": "config/example2.jpg", "last_seen": ""}, }, - "display": True + "ntfy_url": "https://ntfy.sh/example", + "display": True, } config_path = pathlib.Path("config/config.json") if config_path.exists(): @@ -83,32 +87,12 @@ if DISPLAY: config["DISPLAY"] = DISPLAY else: DISPLAY = config["display"] - +if NTFY_URL: + config["ntfy_url"] = NTFY_URL +else: + NTFY_URL = config["ntfy_url"] print(f"Current config: {config}") -# Try this 5 times, 5 seconds apart. If the stream is not available, exit -# for i in range(5): -# # Check if HLS stream is available using the requests library -# # If it is not, print an error and exit -# url = URL.replace("rtsp", "http").replace(":8554", ":8888") -# print(f"Checking if HLS stream is available at {url}...") -# try: -# # Replace rtsp with http and the port with 8888 -# r = requests.get(url) -# if r.status_code != 200: -# print("HLS stream not available, please check your URL") -# exit() -# except requests.exceptions.RequestException as e: -# print("HLS stream not available, please check your URL") -# if i == 4: -# exit() -# else: -# print(f"Retrying in 5 seconds ({i+1}/5)") -# time.sleep(5) -# continue - - - for face in config["faces"]: # Load a sample picture and learn how to recognize it. image = face_recognition.load_image_file(config["faces"][face]["image"]) @@ -124,70 +108,73 @@ video_capture = cv2.VideoCapture(URL) video_capture.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Print the resolution of the video -print(f"Video resolution: {video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)}x{video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)}") +print( + f"Video resolution: {video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)}x{video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)}" +) +print("Beginning video capture...") while True: # Grab a single frame of video ret, frame = video_capture.read() - # Only process every other frame of video to save time - # if process_this_frame: - if True: - # Resize frame of video to a smaller size for faster face recognition processing - run_frame = cv2.resize(frame, (0, 0), fx=RUN_SCALE, fy=RUN_SCALE) - view_frame = cv2.resize(frame, (0, 0), fx=VIEW_SCALE, fy=VIEW_SCALE) - - # Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses) - rgb_run_frame = run_frame[:, :, ::-1] - - # Find all the faces and face encodings in the current frame of video - # model cnn is gpu accelerated, but hog is cpu only - face_locations = face_recognition.face_locations(rgb_run_frame, model="hog") - face_encodings = face_recognition.face_encodings(rgb_run_frame, face_locations) - - face_names = [] - for face_encoding in face_encodings: - # See if the face is a match for the known face(s) - matches = face_recognition.compare_faces( - known_face_encodings, face_encoding - ) - name = "Unknown" - - # Or instead, use the known face with the smallest distance to the new face - face_distances = face_recognition.face_distance( - known_face_encodings, face_encoding - ) - best_match_index = np.argmin(face_distances) - if matches[best_match_index]: - # print("For debugging, I found a face!!!! :D this should not be included in the final product lol :P") - name = known_face_names[best_match_index] - last_seen = config["faces"][name]["last_seen"] - # If it's never been seen, set the last seen time to six seconds ago so it will be seen - # Kind of a hacky way to do it, but it works... hopefully - if last_seen == "": - print(f"{name} has been seen") - config["faces"][name]["last_seen"] = ( - datetime.datetime.now() - datetime.timedelta(seconds=6) - ).strftime(DATETIME_FORMAT) - write_config() - # Check if the face has been seen in the last 5 seconds - if datetime.datetime.now() - datetime.datetime.strptime( - last_seen, DATETIME_FORMAT - ) > datetime.timedelta(seconds=5): - print(f"{name} has been seen") - # Update the last seen time - config["faces"][name]["last_seen"] = datetime.datetime.now().strftime( - DATETIME_FORMAT - ) + # Resize frame of video to a smaller size for faster face recognition processing + run_frame = cv2.resize(frame, (0, 0), fx=RUN_SCALE, fy=RUN_SCALE) + view_frame = cv2.resize(frame, (0, 0), fx=VIEW_SCALE, fy=VIEW_SCALE) + # Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses) + rgb_run_frame = run_frame[:, :, ::-1] + # Find all the faces and face encodings in the current frame of video + # model cnn is gpu accelerated, but hog is cpu only + face_locations = face_recognition.face_locations(rgb_run_frame, model="hog") # This crashes the program without output on my laptop when it's running without Docker compose + face_encodings = face_recognition.face_encodings(rgb_run_frame, face_locations) + face_names = [] + for face_encoding in face_encodings: + # See if the face is a match for the known face(s) + matches = face_recognition.compare_faces(known_face_encodings, face_encoding) + name = "Unknown" + # Or instead, use the known face with the smallest distance to the new face + face_distances = face_recognition.face_distance( + known_face_encodings, face_encoding + ) + best_match_index = np.argmin(face_distances) + if matches[best_match_index]: + name = known_face_names[best_match_index] + last_seen = config["faces"][name]["last_seen"] + # If it's never been seen, set the last seen time to x+5 seconds ago so it will be seen + # Kind of a hacky way to do it, but it works... hopefully + if last_seen == "": + print(f"{name} has been seen") + config["faces"][name]["last_seen"] = ( + datetime.datetime.now() - datetime.timedelta(seconds=15) + ).strftime(DATETIME_FORMAT) write_config() - face_names.append(name) - - process_this_frame = not process_this_frame - + # Check if the face has been seen in the last 5 seconds + if datetime.datetime.now() - datetime.datetime.strptime( + last_seen, DATETIME_FORMAT + ) > datetime.timedelta(seconds=10): + print(f"{name} has been seen") + # Update the last seen time + config["faces"][name]["last_seen"] = datetime.datetime.now().strftime( + DATETIME_FORMAT + ) + # Send a notification + print(f"Sending notification to{NTFY_URL}") + requests.post( + NTFY_URL, + data=f'"{name}" has been seen', + headers={ + "Title": "Face Detected", + "Priority": "urgent", + "Tags": "neutral_face", + }, + ) + print("Writing config...") + write_config() + face_names.append(name) # Display the results # Iterate over each face found in the frame to draw a box around it # Zip is used to iterate over two lists at the same time for (top, right, bottom, left), name in zip(face_locations, face_names): + print(f"Face found at {top}, {right}, {bottom}, {left} with name {name}") # Scale back up face locations since the frame we detected in was scaled to 1/4 size top = int(top * (VIEW_SCALE / RUN_SCALE)) right = int(right * (VIEW_SCALE / RUN_SCALE)) @@ -215,5 +202,6 @@ while True: break # Release handle to the webcam +print("Releasing video capture") video_capture.release() cv2.destroyAllWindows()