2023-10-13 23:44:38 +01:00
|
|
|
import cv2
|
|
|
|
import numpy as np
|
2023-10-14 21:40:36 +01:00
|
|
|
from pathlib import Path
|
|
|
|
from deepface import DeepFace
|
2023-12-22 21:22:01 +00:00
|
|
|
from . import notify
|
2023-10-14 00:16:55 +01:00
|
|
|
|
2023-10-14 23:37:42 +01:00
|
|
|
first_face_try = True
|
|
|
|
|
2023-12-22 21:22:01 +00:00
|
|
|
# TODO: When multi-camera support is added, this will need to be changed so that each camera has its own dict
|
|
|
|
objects_and_peoples = {
|
|
|
|
"objects": {},
|
|
|
|
"peoples": {},
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def process_footage(
|
|
|
|
# Frame
|
|
|
|
frame: np.ndarray = None,
|
|
|
|
|
|
|
|
# scale
|
|
|
|
run_scale: float = None,
|
|
|
|
view_scale: float = None,
|
|
|
|
|
|
|
|
# Face stuff
|
|
|
|
faces_directory: str = None,
|
|
|
|
face_confidence_threshold: float = None,
|
|
|
|
no_remove_representations: bool = False,
|
|
|
|
|
|
|
|
# Timer stuff
|
|
|
|
detection_window: int = None,
|
|
|
|
detection_duration: int = None,
|
|
|
|
notification_window: int = None,
|
|
|
|
|
|
|
|
ntfy_url: str = None,
|
|
|
|
|
|
|
|
# Object stuff
|
|
|
|
# YOLO object
|
|
|
|
model = None,
|
|
|
|
detect_object: list = None,
|
|
|
|
object_confidence_threshold = None
|
|
|
|
) -> np.ndarray:
|
|
|
|
"""
|
|
|
|
Takes in a frame and processes it
|
|
|
|
"""
|
|
|
|
|
|
|
|
global objects_and_peoples
|
|
|
|
|
|
|
|
|
|
|
|
# Resize frame of video to a smaller size for faster recognition processing
|
|
|
|
run_frame = cv2.resize(frame, (0, 0), fx=run_scale, fy=run_scale)
|
|
|
|
# view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale)
|
|
|
|
|
|
|
|
results = model(run_frame, verbose=False)
|
|
|
|
|
|
|
|
path_to_faces = Path(faces_directory)
|
|
|
|
path_to_faces_exists = path_to_faces.is_dir()
|
|
|
|
|
|
|
|
for i, r in enumerate(results):
|
|
|
|
# list of dicts with each dict containing a label, x1, y1, x2, y2
|
|
|
|
plot_boxes = []
|
|
|
|
|
|
|
|
# The following is stuff for people
|
|
|
|
# This is still in the for loop as each result, no matter if anything is detected, will be present.
|
|
|
|
# Thus, there will always be one result (r)
|
|
|
|
|
|
|
|
# Only run if path_to_faces exists
|
|
|
|
# May be better to check every iteration, but this also works
|
|
|
|
if path_to_faces_exists:
|
|
|
|
if face_details := recognize_face(
|
|
|
|
path_to_directory=path_to_faces,
|
|
|
|
run_frame=run_frame,
|
|
|
|
# Perhaps make these names match?
|
|
|
|
min_confidence=face_confidence_threshold,
|
|
|
|
no_remove_representations=no_remove_representations,
|
|
|
|
):
|
|
|
|
plot_boxes.append(face_details)
|
|
|
|
objects_and_peoples = notify.thing_detected(
|
|
|
|
thing_name=face_details["label"],
|
|
|
|
objects_and_peoples=objects_and_peoples,
|
|
|
|
detection_type="peoples",
|
|
|
|
detection_window=detection_window,
|
|
|
|
detection_duration=detection_duration,
|
|
|
|
notification_window=notification_window,
|
|
|
|
ntfy_url=ntfy_url,
|
|
|
|
)
|
|
|
|
|
|
|
|
# The following is stuff for objects
|
|
|
|
# Setup dictionary of object names
|
|
|
|
if (
|
|
|
|
objects_and_peoples["objects"] == {}
|
|
|
|
or objects_and_peoples["objects"] is None
|
|
|
|
):
|
|
|
|
for name in r.names.values():
|
|
|
|
objects_and_peoples["objects"][name] = {
|
|
|
|
"last_detection_time": None,
|
|
|
|
"detection_duration": None,
|
|
|
|
# "first_detection_time": None,
|
|
|
|
"last_notification_time": None,
|
|
|
|
}
|
|
|
|
# Also, make sure that the objects to detect are in the list of objects_and_peoples
|
|
|
|
# If it isn't, print a warning
|
|
|
|
for obj in detect_object:
|
|
|
|
if obj not in objects_and_peoples:
|
|
|
|
print(
|
|
|
|
f"Warning: {obj} is not in the list of objects the model can detect!"
|
|
|
|
)
|
|
|
|
|
|
|
|
for box in r.boxes:
|
|
|
|
# Get the name of the object
|
|
|
|
class_id = r.names[box.cls[0].item()]
|
|
|
|
# Get the coordinates of the object
|
|
|
|
cords = box.xyxy[0].tolist()
|
|
|
|
cords = [round(x) for x in cords]
|
|
|
|
# Get the confidence
|
|
|
|
conf = round(box.conf[0].item(), 2)
|
|
|
|
# Print it out, adding a spacer between each object
|
|
|
|
# print("Object type:", class_id)
|
|
|
|
# print("Coordinates:", cords)
|
|
|
|
# print("Probability:", conf)
|
|
|
|
# print("---")
|
|
|
|
|
|
|
|
# Now do stuff (if conf > 0.5)
|
|
|
|
if conf < object_confidence_threshold or (
|
|
|
|
class_id not in detect_object and detect_object != []
|
|
|
|
):
|
|
|
|
# If the confidence is too low
|
|
|
|
# or if the object is not in the list of objects to detect and the list of objects to detect is not empty
|
|
|
|
# then skip this iteration
|
|
|
|
continue
|
|
|
|
|
|
|
|
# Add the object to the list of objects to plot
|
|
|
|
plot_boxes.append(
|
|
|
|
{
|
|
|
|
"label": class_id,
|
|
|
|
"x1": cords[0],
|
|
|
|
"y1": cords[1],
|
|
|
|
"x2": cords[2],
|
|
|
|
"y2": cords[3],
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
objects_and_peoples = notify.thing_detected(
|
|
|
|
thing_name=class_id,
|
|
|
|
objects_and_peoples=objects_and_peoples,
|
|
|
|
detection_type="objects",
|
|
|
|
detection_window=detection_window,
|
|
|
|
detection_duration=detection_duration,
|
|
|
|
notification_window=notification_window,
|
|
|
|
ntfy_url=ntfy_url,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# To debug plotting, use r.plot() to cross reference the bounding boxes drawn by the plot_label() and r.plot()
|
|
|
|
frame_to_show = plot_label(
|
|
|
|
boxes=plot_boxes,
|
|
|
|
full_frame=frame,
|
|
|
|
# full_frame=r.plot(),
|
|
|
|
run_scale=run_scale,
|
|
|
|
view_scale=view_scale,
|
|
|
|
)
|
|
|
|
# Unsure if this should also return the objects_and_peoples dict
|
|
|
|
return frame_to_show
|
|
|
|
|
2023-10-15 01:25:27 +01:00
|
|
|
|
2023-10-13 23:44:38 +01:00
|
|
|
def plot_label(
|
2023-10-14 00:16:55 +01:00
|
|
|
# list of dicts with each dict containing a label, x1, y1, x2, y2
|
|
|
|
boxes: list = None,
|
|
|
|
# opencv image
|
|
|
|
full_frame: np.ndarray = None,
|
|
|
|
# run_scale is the scale of the image that was used to run the model
|
|
|
|
# So the coordinates will be scaled up to the view frame size
|
|
|
|
run_scale: float = None,
|
|
|
|
# view_scale is the scale of the image, in relation to the full frame
|
|
|
|
# So the coordinates will be scaled appropriately when coming from run_frame
|
|
|
|
view_scale: float = None,
|
|
|
|
font: int = cv2.FONT_HERSHEY_SIMPLEX,
|
2023-12-22 21:22:01 +00:00
|
|
|
) -> np.ndarray:
|
2023-10-15 01:25:27 +01:00
|
|
|
# x1 and y1 are the top left corner of the box
|
|
|
|
# x2 and y2 are the bottom right corner of the box
|
|
|
|
# Example scaling: full_frame: 1 run_frame: 0.5 view_frame: 0.25
|
2023-10-13 23:44:38 +01:00
|
|
|
view_frame = cv2.resize(full_frame, (0, 0), fx=view_scale, fy=view_scale)
|
|
|
|
for thing in boxes:
|
|
|
|
cv2.rectangle(
|
|
|
|
# Image
|
|
|
|
view_frame,
|
2023-10-15 01:25:27 +01:00
|
|
|
# Top left corner
|
2023-10-14 00:16:55 +01:00
|
|
|
(
|
2023-10-15 01:25:27 +01:00
|
|
|
int((thing["x1"] / run_scale) * view_scale),
|
|
|
|
int((thing["y1"] / run_scale) * view_scale),
|
2023-10-14 00:16:55 +01:00
|
|
|
),
|
2023-10-15 01:25:27 +01:00
|
|
|
# Bottom right corner
|
2023-10-14 00:16:55 +01:00
|
|
|
(
|
2023-10-15 01:25:27 +01:00
|
|
|
int((thing["x2"] / run_scale) * view_scale),
|
|
|
|
int((thing["y2"] / run_scale) * view_scale),
|
2023-10-14 00:16:55 +01:00
|
|
|
),
|
2023-10-13 23:44:38 +01:00
|
|
|
# Color
|
|
|
|
(0, 255, 0),
|
|
|
|
# Thickness
|
|
|
|
2,
|
|
|
|
)
|
|
|
|
cv2.putText(
|
|
|
|
# Image
|
|
|
|
view_frame,
|
|
|
|
# Text
|
|
|
|
thing["label"],
|
|
|
|
# Origin
|
2023-10-14 00:16:55 +01:00
|
|
|
(
|
2023-10-15 01:25:27 +01:00
|
|
|
int((thing["x1"] / run_scale) * view_scale),
|
|
|
|
int((thing["y1"] / run_scale) * view_scale) - 10,
|
2023-10-14 00:16:55 +01:00
|
|
|
),
|
2023-10-13 23:44:38 +01:00
|
|
|
# Font
|
|
|
|
font,
|
|
|
|
# Font Scale
|
|
|
|
1,
|
|
|
|
# Color
|
|
|
|
(0, 255, 0),
|
|
|
|
# Thickness
|
2023-10-14 00:16:55 +01:00
|
|
|
1,
|
2023-10-13 23:44:38 +01:00
|
|
|
)
|
2023-10-14 00:16:55 +01:00
|
|
|
return view_frame
|
2023-10-14 21:40:36 +01:00
|
|
|
|
|
|
|
|
|
|
|
def recognize_face(
|
2023-10-15 01:25:27 +01:00
|
|
|
path_to_directory: Path = Path("faces"),
|
|
|
|
# opencv image
|
|
|
|
run_frame: np.ndarray = None,
|
2023-10-27 17:23:44 +01:00
|
|
|
min_confidence: float = 0.3,
|
2023-10-27 17:33:05 +01:00
|
|
|
no_remove_representations: bool = False,
|
2023-10-14 21:40:36 +01:00
|
|
|
) -> np.ndarray:
|
2023-10-15 01:25:27 +01:00
|
|
|
"""
|
2023-10-14 21:40:36 +01:00
|
|
|
Accepts a path to a directory of images of faces to be used as a refference
|
|
|
|
In addition, accepts an opencv image to be used as the frame to be searched
|
2023-10-15 01:25:27 +01:00
|
|
|
|
2023-10-14 23:37:42 +01:00
|
|
|
Returns a single dictonary as currently only 1 face can be detected in each frame
|
2023-10-22 22:45:01 +01:00
|
|
|
Cosine threshold is 0.3, so if the confidence is less than that, it will return None
|
2023-10-14 21:40:36 +01:00
|
|
|
dict contains the following keys: label, x1, y1, x2, y2
|
|
|
|
The directory should be structured as follows:
|
|
|
|
faces/
|
|
|
|
name/
|
|
|
|
image1.jpg
|
|
|
|
image2.jpg
|
|
|
|
image3.jpg
|
|
|
|
name2/
|
|
|
|
image1.jpg
|
|
|
|
image2.jpg
|
|
|
|
image3.jpg
|
|
|
|
(not neccessarily jpgs, but you get the idea)
|
|
|
|
|
|
|
|
Point is, `name` is the name of the person in the images in the directory `name`
|
|
|
|
That name will be used as the label for the face in the frame
|
2023-10-15 01:25:27 +01:00
|
|
|
"""
|
2023-10-14 23:37:42 +01:00
|
|
|
global first_face_try
|
|
|
|
|
2023-10-22 18:02:07 +01:00
|
|
|
# If it's the first time the function is being run, remove representations_arcface.pkl, if it exists
|
2023-10-27 17:33:05 +01:00
|
|
|
if first_face_try and not no_remove_representations:
|
2023-10-14 23:37:42 +01:00
|
|
|
try:
|
2023-10-22 18:02:07 +01:00
|
|
|
path_to_directory.joinpath("representations_arcface.pkl").unlink()
|
|
|
|
print("Removing representations_arcface.pkl")
|
2023-10-14 23:37:42 +01:00
|
|
|
except FileNotFoundError:
|
2023-10-22 18:02:07 +01:00
|
|
|
print("representations_arcface.pkl does not exist")
|
2023-10-14 23:37:42 +01:00
|
|
|
first_face_try = False
|
2023-10-27 17:33:05 +01:00
|
|
|
elif first_face_try and no_remove_representations:
|
|
|
|
print("Not attempting to remove representations_arcface.pkl")
|
|
|
|
first_face_try = False
|
2023-10-14 23:37:42 +01:00
|
|
|
|
2023-10-14 21:40:36 +01:00
|
|
|
# face_dataframes is a vanilla list of dataframes
|
2023-10-22 18:02:07 +01:00
|
|
|
# It seems face_dataframes is empty if the face database (directory) doesn't exist. Seems to work if it's empty though
|
|
|
|
# This line is here to prevent a crash if that happens. However, there is a check in __main__ so it shouldn't happen
|
2023-10-22 22:54:30 +01:00
|
|
|
face_dataframes = []
|
2023-10-14 23:37:42 +01:00
|
|
|
try:
|
2023-10-15 01:25:27 +01:00
|
|
|
face_dataframes = DeepFace.find(
|
|
|
|
run_frame,
|
|
|
|
db_path=str(path_to_directory),
|
2023-10-22 22:45:01 +01:00
|
|
|
# Problem with enforce_detection=False is that it will always (?) return a face, no matter the confidence
|
|
|
|
# Thus, false-positives need to be filtered out
|
2023-10-22 22:20:57 +01:00
|
|
|
enforce_detection=False,
|
2023-10-15 01:25:27 +01:00
|
|
|
silent=True,
|
2023-10-22 22:20:57 +01:00
|
|
|
# Could use VGG-Face, but whilst fixing another issue, ArcFace seemed to be slightly faster
|
|
|
|
# I read somewhere that opencv is the fastest (but not as accurate). Could be changed later, but opencv seems to work well
|
2023-10-22 22:54:30 +01:00
|
|
|
model_name="ArcFace",
|
|
|
|
detector_backend="opencv",
|
|
|
|
)
|
2023-12-03 00:21:41 +00:00
|
|
|
|
|
|
|
except (ValueError) as e:
|
2023-10-15 01:25:27 +01:00
|
|
|
if (
|
|
|
|
str(e)
|
2023-10-22 22:54:30 +01:00
|
|
|
== "Face could not be detected. Please confirm that the picture is a face photo or consider to set enforce_detection param to False." # noqa: E501
|
2023-10-15 01:25:27 +01:00
|
|
|
):
|
2023-10-22 18:02:07 +01:00
|
|
|
# print("No faces recognized") # For debugging
|
2023-10-14 23:37:42 +01:00
|
|
|
return None
|
2023-12-03 00:21:41 +00:00
|
|
|
elif (
|
|
|
|
# Check if the error message contains "Validate .jpg or .png files exist in this path."
|
|
|
|
"Validate .jpg or .png files exist in this path." in str(e)
|
|
|
|
):
|
|
|
|
# If a verbose/silent flag is added, this should be changed to print only if verbose is true
|
|
|
|
# print("No faces found in database")
|
|
|
|
return None
|
2023-10-22 18:02:07 +01:00
|
|
|
else:
|
|
|
|
raise e
|
2023-10-14 21:40:36 +01:00
|
|
|
# Iteate over the dataframes
|
|
|
|
for df in face_dataframes:
|
|
|
|
# The last row is the highest confidence
|
|
|
|
# So we can just grab the path from there
|
|
|
|
# iloc = Integer LOCation
|
2023-11-03 00:54:55 +00:00
|
|
|
try:
|
|
|
|
path_to_image = Path(df.iloc[-1]["identity"])
|
|
|
|
# Seems this is caused when someone steps into frame and their face is detected but not recognized
|
|
|
|
except IndexError:
|
|
|
|
print("Face present but not recognized")
|
2023-12-09 16:10:41 +00:00
|
|
|
continue
|
2023-10-27 16:54:36 +01:00
|
|
|
# If the parent name is the same as the path to the database, then set label to the image name instead of the parent name
|
2023-10-22 18:02:07 +01:00
|
|
|
if path_to_image.parent == Path(path_to_directory):
|
|
|
|
label = path_to_image.name
|
|
|
|
else:
|
|
|
|
label = path_to_image.parent.name
|
2023-10-14 21:40:36 +01:00
|
|
|
# Return the coordinates of the box in xyxy format, rather than xywh
|
2023-10-15 01:25:27 +01:00
|
|
|
# This is because YOLO uses xyxy, and that's how plot_label expects
|
2023-10-14 21:40:36 +01:00
|
|
|
# Also, xyxy is just the top left and bottom right corners of the box
|
|
|
|
coordinates = {
|
|
|
|
"x1": df.iloc[-1]["source_x"],
|
|
|
|
"y1": df.iloc[-1]["source_y"],
|
|
|
|
"x2": df.iloc[-1]["source_x"] + df.iloc[-1]["source_w"],
|
|
|
|
"y2": df.iloc[-1]["source_y"] + df.iloc[-1]["source_h"],
|
|
|
|
}
|
2023-10-22 22:45:01 +01:00
|
|
|
# After some brief testing, it seems positive matches are > 0.3
|
2023-10-27 17:23:44 +01:00
|
|
|
cosine_similarity = df.iloc[-1]["ArcFace_cosine"]
|
|
|
|
if cosine_similarity < min_confidence:
|
2023-10-22 22:45:01 +01:00
|
|
|
return None
|
2023-10-15 01:25:27 +01:00
|
|
|
# label = "Unknown"
|
2023-10-14 23:37:42 +01:00
|
|
|
to_return = dict(label=label, **coordinates)
|
2023-10-15 01:25:27 +01:00
|
|
|
print(
|
2023-10-27 17:23:44 +01:00
|
|
|
f"Cosine similarity: {cosine_similarity}, filname: {path_to_image.name}, to_return: {to_return}"
|
2023-10-15 01:25:27 +01:00
|
|
|
)
|
2023-10-14 23:37:42 +01:00
|
|
|
return to_return
|
2023-12-22 21:22:01 +00:00
|
|
|
return None
|
2023-10-14 21:40:36 +01:00
|
|
|
|
2023-10-15 01:25:27 +01:00
|
|
|
"""
|
2023-10-14 21:40:36 +01:00
|
|
|
Example dataframe, for reference
|
|
|
|
identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \_('_')_/)
|
2023-10-15 01:25:27 +01:00
|
|
|
"""
|