Added support for `ntfy.sh`

This commit is contained in:
slashtechno 2022-12-18 13:07:11 -06:00
parent dcc1061fcc
commit 2a809836c0
Signed by: slashtechno
GPG Key ID: 8EC1D9D9286C2B17
2 changed files with 68 additions and 80 deletions

Binary file not shown.

148
main.py
View File

@ -19,6 +19,7 @@ DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
# VIEW_SCALE = 0.75 # VIEW_SCALE = 0.75
DISPLAY = False DISPLAY = False
RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE")
NTFY_URL = os.getenv("NTFY_URL")
def find_face_from_name(name): def find_face_from_name(name):
@ -46,14 +47,17 @@ process_this_frame = True
# Load the config file, if it does not exist or is blank, create it # Load the config file, if it does not exist or is blank, create it
config = { config = {
# If RUN_BY_COMPOSE is true, set url to rtsp://wyze-bridge:8554/wyze_cam_name, otherwise set it to "rtsp://localhost:8554/wyze_cam_name" # If RUN_BY_COMPOSE is true, set url to rtsp://wyze-bridge:8554/wyze_cam_name, otherwise set it to "rtsp://localhost:8554/wyze_cam_name"
"URL": "rtsp://localhost:8554/wyze_cam_name" if not RUN_BY_COMPOSE else "rtsp://bridge:8554/wyze_cam_name", "URL": "rtsp://localhost:8554/wyze_cam_name"
if not RUN_BY_COMPOSE
else "rtsp://bridge:8554/wyze_cam_name",
"run_scale": "0.25", "run_scale": "0.25",
"view_scale": "0.75", "view_scale": "0.75",
"faces": { "faces": {
"example1": {"image": "config/example1.jpg", "last_seen": ""}, "example1": {"image": "config/example1.jpg", "last_seen": ""},
"example2": {"image": "config/example2.jpg", "last_seen": ""}, "example2": {"image": "config/example2.jpg", "last_seen": ""},
}, },
"display": True "ntfy_url": "https://ntfy.sh/example",
"display": True,
} }
config_path = pathlib.Path("config/config.json") config_path = pathlib.Path("config/config.json")
if config_path.exists(): if config_path.exists():
@ -83,32 +87,12 @@ if DISPLAY:
config["DISPLAY"] = DISPLAY config["DISPLAY"] = DISPLAY
else: else:
DISPLAY = config["display"] DISPLAY = config["display"]
if NTFY_URL:
config["ntfy_url"] = NTFY_URL
else:
NTFY_URL = config["ntfy_url"]
print(f"Current config: {config}") print(f"Current config: {config}")
# Try this 5 times, 5 seconds apart. If the stream is not available, exit
# for i in range(5):
# # Check if HLS stream is available using the requests library
# # If it is not, print an error and exit
# url = URL.replace("rtsp", "http").replace(":8554", ":8888")
# print(f"Checking if HLS stream is available at {url}...")
# try:
# # Replace rtsp with http and the port with 8888
# r = requests.get(url)
# if r.status_code != 200:
# print("HLS stream not available, please check your URL")
# exit()
# except requests.exceptions.RequestException as e:
# print("HLS stream not available, please check your URL")
# if i == 4:
# exit()
# else:
# print(f"Retrying in 5 seconds ({i+1}/5)")
# time.sleep(5)
# continue
for face in config["faces"]: for face in config["faces"]:
# Load a sample picture and learn how to recognize it. # Load a sample picture and learn how to recognize it.
image = face_recognition.load_image_file(config["faces"][face]["image"]) image = face_recognition.load_image_file(config["faces"][face]["image"])
@ -124,70 +108,73 @@ video_capture = cv2.VideoCapture(URL)
video_capture.set(cv2.CAP_PROP_BUFFERSIZE, 1) video_capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# Print the resolution of the video # Print the resolution of the video
print(f"Video resolution: {video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)}x{video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)}") print(
f"Video resolution: {video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)}x{video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)}"
)
print("Beginning video capture...")
while True: while True:
# Grab a single frame of video # Grab a single frame of video
ret, frame = video_capture.read() ret, frame = video_capture.read()
# Only process every other frame of video to save time # Only process every other frame of video to save time
# if process_this_frame: # Resize frame of video to a smaller size for faster face recognition processing
if True: run_frame = cv2.resize(frame, (0, 0), fx=RUN_SCALE, fy=RUN_SCALE)
# Resize frame of video to a smaller size for faster face recognition processing view_frame = cv2.resize(frame, (0, 0), fx=VIEW_SCALE, fy=VIEW_SCALE)
run_frame = cv2.resize(frame, (0, 0), fx=RUN_SCALE, fy=RUN_SCALE) # Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses)
view_frame = cv2.resize(frame, (0, 0), fx=VIEW_SCALE, fy=VIEW_SCALE) rgb_run_frame = run_frame[:, :, ::-1]
# Find all the faces and face encodings in the current frame of video
# Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses) # model cnn is gpu accelerated, but hog is cpu only
rgb_run_frame = run_frame[:, :, ::-1] face_locations = face_recognition.face_locations(rgb_run_frame, model="hog") # This crashes the program without output on my laptop when it's running without Docker compose
face_encodings = face_recognition.face_encodings(rgb_run_frame, face_locations)
# Find all the faces and face encodings in the current frame of video face_names = []
# model cnn is gpu accelerated, but hog is cpu only for face_encoding in face_encodings:
face_locations = face_recognition.face_locations(rgb_run_frame, model="hog") # See if the face is a match for the known face(s)
face_encodings = face_recognition.face_encodings(rgb_run_frame, face_locations) matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
name = "Unknown"
face_names = [] # Or instead, use the known face with the smallest distance to the new face
for face_encoding in face_encodings: face_distances = face_recognition.face_distance(
# See if the face is a match for the known face(s) known_face_encodings, face_encoding
matches = face_recognition.compare_faces( )
known_face_encodings, face_encoding best_match_index = np.argmin(face_distances)
) if matches[best_match_index]:
name = "Unknown" name = known_face_names[best_match_index]
last_seen = config["faces"][name]["last_seen"]
# Or instead, use the known face with the smallest distance to the new face # If it's never been seen, set the last seen time to x+5 seconds ago so it will be seen
face_distances = face_recognition.face_distance( # Kind of a hacky way to do it, but it works... hopefully
known_face_encodings, face_encoding if last_seen == "":
) print(f"{name} has been seen")
best_match_index = np.argmin(face_distances) config["faces"][name]["last_seen"] = (
if matches[best_match_index]: datetime.datetime.now() - datetime.timedelta(seconds=15)
# print("For debugging, I found a face!!!! :D this should not be included in the final product lol :P") ).strftime(DATETIME_FORMAT)
name = known_face_names[best_match_index]
last_seen = config["faces"][name]["last_seen"]
# If it's never been seen, set the last seen time to six seconds ago so it will be seen
# Kind of a hacky way to do it, but it works... hopefully
if last_seen == "":
print(f"{name} has been seen")
config["faces"][name]["last_seen"] = (
datetime.datetime.now() - datetime.timedelta(seconds=6)
).strftime(DATETIME_FORMAT)
write_config()
# Check if the face has been seen in the last 5 seconds
if datetime.datetime.now() - datetime.datetime.strptime(
last_seen, DATETIME_FORMAT
) > datetime.timedelta(seconds=5):
print(f"{name} has been seen")
# Update the last seen time
config["faces"][name]["last_seen"] = datetime.datetime.now().strftime(
DATETIME_FORMAT
)
write_config() write_config()
face_names.append(name) # Check if the face has been seen in the last 5 seconds
if datetime.datetime.now() - datetime.datetime.strptime(
process_this_frame = not process_this_frame last_seen, DATETIME_FORMAT
) > datetime.timedelta(seconds=10):
print(f"{name} has been seen")
# Update the last seen time
config["faces"][name]["last_seen"] = datetime.datetime.now().strftime(
DATETIME_FORMAT
)
# Send a notification
print(f"Sending notification to{NTFY_URL}")
requests.post(
NTFY_URL,
data=f'"{name}" has been seen',
headers={
"Title": "Face Detected",
"Priority": "urgent",
"Tags": "neutral_face",
},
)
print("Writing config...")
write_config()
face_names.append(name)
# Display the results # Display the results
# Iterate over each face found in the frame to draw a box around it # Iterate over each face found in the frame to draw a box around it
# Zip is used to iterate over two lists at the same time # Zip is used to iterate over two lists at the same time
for (top, right, bottom, left), name in zip(face_locations, face_names): for (top, right, bottom, left), name in zip(face_locations, face_names):
print(f"Face found at {top}, {right}, {bottom}, {left} with name {name}")
# Scale back up face locations since the frame we detected in was scaled to 1/4 size # Scale back up face locations since the frame we detected in was scaled to 1/4 size
top = int(top * (VIEW_SCALE / RUN_SCALE)) top = int(top * (VIEW_SCALE / RUN_SCALE))
right = int(right * (VIEW_SCALE / RUN_SCALE)) right = int(right * (VIEW_SCALE / RUN_SCALE))
@ -215,5 +202,6 @@ while True:
break break
# Release handle to the webcam # Release handle to the webcam
print("Releasing video capture")
video_capture.release() video_capture.release()
cv2.destroyAllWindows() cv2.destroyAllWindows()