Merge pull request #5 from slashtechno/object-detection
Add object detection and switch to Deepface
This commit is contained in:
commit
0daa8ddd3b
|
@ -0,0 +1,42 @@
|
||||||
|
FROM python:3.10-bullseye
|
||||||
|
|
||||||
|
# Install Dlib (for face_recognition)
|
||||||
|
RUN apt-get -y update && apt-get install -y --fix-missing \
|
||||||
|
build-essential \
|
||||||
|
cmake \
|
||||||
|
gfortran \
|
||||||
|
git \
|
||||||
|
wget \
|
||||||
|
curl \
|
||||||
|
graphicsmagick \
|
||||||
|
libgraphicsmagick1-dev \
|
||||||
|
libatlas-base-dev \
|
||||||
|
libavcodec-dev \
|
||||||
|
libavformat-dev \
|
||||||
|
libgtk2.0-dev \
|
||||||
|
libjpeg-dev \
|
||||||
|
liblapack-dev \
|
||||||
|
libswscale-dev \
|
||||||
|
pkg-config \
|
||||||
|
python3-dev \
|
||||||
|
python3-numpy \
|
||||||
|
software-properties-common \
|
||||||
|
zip
|
||||||
|
RUN apt-get clean
|
||||||
|
RUN rm -rf /tmp/* /var/tmp/*
|
||||||
|
ENV CFLAGS=-static
|
||||||
|
# Install dos2unix
|
||||||
|
# RUN apt-get install -y dos2unix
|
||||||
|
# Upgrade pip
|
||||||
|
RUN pip3 install --upgrade pip
|
||||||
|
# Copy directory to container
|
||||||
|
WORKDIR /app
|
||||||
|
COPY . ./
|
||||||
|
# Run dos2unix on all files in /app
|
||||||
|
# RUN dos2unix /app/*
|
||||||
|
# Install from requirements.txt
|
||||||
|
RUN pip3 install -r requirements.txt
|
||||||
|
# Install wait-for-it so this can easily be used with docker-compose
|
||||||
|
# Example: command: ["./wait-for-it.sh", "bridge:8554", "--", "python", "main.py"]
|
||||||
|
RUN wget https://raw.githubusercontent.com/vishnubob/wait-for-it/master/wait-for-it.sh && chmod +x wait-for-it.sh && mv wait-for-it.sh /bin
|
||||||
|
CMD ["python3", "main.py"]
|
|
@ -1 +1,3 @@
|
||||||
.config/
|
Dockerfile
|
||||||
|
.venv
|
||||||
|
docker-compose.yml
|
|
@ -25,7 +25,7 @@ jobs:
|
||||||
uses: actions/checkout@v3
|
uses: actions/checkout@v3
|
||||||
|
|
||||||
- name: Log in to the Container registry
|
- name: Log in to the Container registry
|
||||||
uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a
|
uses: docker/login-action@v3.0.0
|
||||||
with:
|
with:
|
||||||
registry: ${{ env.REGISTRY }}
|
registry: ${{ env.REGISTRY }}
|
||||||
username: ${{ github.actor }}
|
username: ${{ github.actor }}
|
||||||
|
|
|
@ -1,2 +1,8 @@
|
||||||
.env
|
.env
|
||||||
config/
|
config/
|
||||||
|
using_yolov8.ipynb
|
||||||
|
yolov8n.pt
|
||||||
|
.venv/
|
||||||
|
__pycache__/
|
||||||
|
faces/*
|
||||||
|
!faces/.gitkeep
|
|
@ -0,0 +1 @@
|
||||||
|
3.10.5
|
|
@ -0,0 +1,15 @@
|
||||||
|
{
|
||||||
|
// Use IntelliSense to learn about possible attributes.
|
||||||
|
// Hover to view descriptions of existing attributes.
|
||||||
|
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"name": "Python: Module",
|
||||||
|
"type": "python",
|
||||||
|
"request": "launch",
|
||||||
|
"module": "set_detect_notify",
|
||||||
|
"justMyCode": true
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
50
Dockerfile
50
Dockerfile
|
@ -1,42 +1,12 @@
|
||||||
FROM python:3.10-bullseye
|
FROM python:3.10.5-buster
|
||||||
|
|
||||||
|
RUN apt update && apt install libgl1 -y
|
||||||
|
RUN pip install poetry
|
||||||
|
|
||||||
# Install Dlib (for face_recognition)
|
|
||||||
RUN apt-get -y update && apt-get install -y --fix-missing \
|
|
||||||
build-essential \
|
|
||||||
cmake \
|
|
||||||
gfortran \
|
|
||||||
git \
|
|
||||||
wget \
|
|
||||||
curl \
|
|
||||||
graphicsmagick \
|
|
||||||
libgraphicsmagick1-dev \
|
|
||||||
libatlas-base-dev \
|
|
||||||
libavcodec-dev \
|
|
||||||
libavformat-dev \
|
|
||||||
libgtk2.0-dev \
|
|
||||||
libjpeg-dev \
|
|
||||||
liblapack-dev \
|
|
||||||
libswscale-dev \
|
|
||||||
pkg-config \
|
|
||||||
python3-dev \
|
|
||||||
python3-numpy \
|
|
||||||
software-properties-common \
|
|
||||||
zip
|
|
||||||
RUN apt-get clean
|
|
||||||
RUN rm -rf /tmp/* /var/tmp/*
|
|
||||||
ENV CFLAGS=-static
|
|
||||||
# Install dos2unix
|
|
||||||
# RUN apt-get install -y dos2unix
|
|
||||||
# Upgrade pip
|
|
||||||
RUN pip3 install --upgrade pip
|
|
||||||
# Copy directory to container
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
COPY . ./
|
|
||||||
# Run dos2unix on all files in /app
|
COPY . .
|
||||||
# RUN dos2unix /app/*
|
|
||||||
# Install from requirements.txt
|
RUN poetry install
|
||||||
RUN pip3 install -r requirements.txt
|
|
||||||
# Install wait-for-it so this can easily be used with docker-compose
|
ENTRYPOINT ["poetry", "run", "python", "-m", "set_detect_notify"]
|
||||||
# Example: command: ["./wait-for-it.sh", "bridge:8554", "--", "python", "main.py"]
|
|
||||||
RUN wget https://raw.githubusercontent.com/vishnubob/wait-for-it/master/wait-for-it.sh && chmod +x wait-for-it.sh && mv wait-for-it.sh /bin
|
|
||||||
CMD ["python3", "main.py"]
|
|
65
README.md
65
README.md
|
@ -1,21 +1,54 @@
|
||||||
# Wyze Face Recognition
|
# Set, Detect, Notify
|
||||||
Recognize faces in Wyze Cam footage and send notifications to your phone (or other devices)
|
Recognize faces/objects in (Wyze Cam) footage and send notifications to your phone (or other devices)
|
||||||
|
|
||||||
## Pre-requisites
|
### Features
|
||||||
* Docker
|
- Recognize objects
|
||||||
* Docker Compose
|
- Recognize faces
|
||||||
* A Wyze Cam
|
- Send notifications to your phone (or other devices) using [ntfy](https://ntfy.sh/)
|
||||||
|
- Optionally, run headless with Docker
|
||||||
|
- Either use a webcam or an RTSP feed
|
||||||
|
- Use [mrlt8/docker-wyze-bridge](https://github.com/mrlt8/docker-wyze-bridge) to get RTSP feeds from Wyze Cams
|
||||||
|
|
||||||
## What's not needed
|
|
||||||
* A Wyze Cam subscription
|
|
||||||
|
|
||||||
## How to use
|
## Prerequisites
|
||||||
1. Clone this repo
|
### Poetry/Python
|
||||||
` git clone https://github.com/slackner/wyze-face-recognition.git`
|
- Camera, either a webcam or a Wyze Cam
|
||||||
2. Add images to the `config` directory
|
- All RTSP feeds _should_ work, however.
|
||||||
3. Copy `config/config.json.example` to `config/config.json` and edit the faces array to match the images you added, and the face names
|
- Python
|
||||||
4. Either set the `WYZE_EMAIL` and `WYZE_PASSWORD` environment variables, or edit `docker-compose.yml` and add your Wyze credentials
|
- Poetry
|
||||||
5. Run `docker-compose up -d`
|
### Docker
|
||||||
|
- A Wyze Cam
|
||||||
|
- Any other RTSP feed _should_ work, as mentioned above
|
||||||
|
- Python
|
||||||
|
- Poetry
|
||||||
|
|
||||||
|
## What's not required
|
||||||
|
- A Wyze subscription
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
### Installation
|
||||||
|
1. Clone this repo with `git clone https://github.com/slashtechno/wyze-face-recognition.git`
|
||||||
|
2. `cd` into the cloned repository
|
||||||
|
3. Then, either install with [Poetry](https://python-poetry.org/) or run with Docker
|
||||||
|
|
||||||
|
#### Docker
|
||||||
|
1. Modify to `docker-compose.yml` to achieve desired configuration
|
||||||
|
2. Run in the background with `docker compose up -d
|
||||||
|
|
||||||
|
#### Poetry
|
||||||
|
1. `poetry install`
|
||||||
|
2. `poetry run -- set-detect-notify`
|
||||||
|
### Configuration
|
||||||
|
The following are some basic CLI options. Most flags have environment variable equivalents which can be helpful when using Docker.
|
||||||
|
|
||||||
|
- For face recognition, put images of faces in subdirectories `./faces` (this can be changed with `--faces-directory`)
|
||||||
|
- Keep in mind, on the first run, face rec
|
||||||
|
- By default, notifications are sent for all objects. This can be changed with one or more occurrences of `--detect-object` to specify which objects to detect
|
||||||
|
- Currently, all classes in the [COCO](https://cocodataset.org/) dataset can be detected
|
||||||
|
- To specify where notifications are sent, specify a [ntfy](https://ntfy.sh/) URL with `--ntfy-url`
|
||||||
|
- To configure the program when using Docker, edit `docker-compose.yml` and/or set environment variables.
|
||||||
|
- **For further information, use `--help`**
|
||||||
|
|
||||||
### How to uninstall
|
### How to uninstall
|
||||||
1. Run `docker-compose down` in the `wyze-face-recognition` directory
|
- If you used Docker, run `docker-compose down --rmi all` in the cloned repository
|
||||||
|
- If you used Poetry, just delete the virtual environment and then the cloned repository
|
|
@ -1,17 +0,0 @@
|
||||||
{
|
|
||||||
"URL": "rtsp://bridge:8554/cv",
|
|
||||||
"RUN_SCALE": "0.5",
|
|
||||||
"VIEW_SCALE": "0.5",
|
|
||||||
"faces": {
|
|
||||||
"person1": {
|
|
||||||
"image": "config/person1.jpg",
|
|
||||||
"last_seen": ""
|
|
||||||
},
|
|
||||||
"person2": {
|
|
||||||
"image": "config/person2.jpg",
|
|
||||||
"last_seen": ""
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"display": false,
|
|
||||||
"ntfy_url": "http://ntfy:80/cam"
|
|
||||||
}
|
|
|
@ -0,0 +1,95 @@
|
||||||
|
{
|
||||||
|
"cells": [
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"from deepface import DeepFace\n",
|
||||||
|
"import cv2\n",
|
||||||
|
"from pathlib import Path\n",
|
||||||
|
"import uuid\n",
|
||||||
|
"import pandas as pd"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"Take pictures"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"# Take a picture using opencv with <uuid>.jpg\n",
|
||||||
|
"# Then delete it after\n",
|
||||||
|
"cap = cv2.VideoCapture(0)\n",
|
||||||
|
"ret, frame = cap.read()\n",
|
||||||
|
"cap.release()\n",
|
||||||
|
"# uuid_str = str(uuid.uuid4())\n",
|
||||||
|
"# uuid_path = Path(uuid_str + \".jpg\")\n",
|
||||||
|
"# cv2.imwrite(str(uuid_path), frame)\n",
|
||||||
|
"# dfs = DeepFace.find(img_path=str(uuid_path), db_path = \"faces\")\n",
|
||||||
|
"# Don't throw an error if no face is detected (enforce_detection=False)\n",
|
||||||
|
"dfs = DeepFace.find(frame, db_path = \"faces\", enforce_detection=False)\n",
|
||||||
|
"# Get the identity of the person\n",
|
||||||
|
"for i, pd_dataframe in enumerate(dfs):\n",
|
||||||
|
" # Sort the dataframe by confidence\n",
|
||||||
|
" # inplace=True means that the dataframe is modified so we don't need to assign it to a new variable\n",
|
||||||
|
" pd_dataframe.sort_values(by=['VGG-Face_cosine'], inplace=True, ascending=False)\n",
|
||||||
|
" print(f'On dataframe {i}')\n",
|
||||||
|
" print(pd_dataframe)\n",
|
||||||
|
" # Get the most likely identity\n",
|
||||||
|
" # print(f'Most likely identity: {pd_dataframe.iloc[0][\"identity\"]}')\n",
|
||||||
|
" # We could use Path to get the parent directory of the image to use as the identity\n",
|
||||||
|
" print(f'Most likely identity: {Path(pd_dataframe.iloc[0][\"identity\"]).parent.name}')\n",
|
||||||
|
" # Get the most likely identity's confidence\n",
|
||||||
|
" print(f'Confidence: {pd_dataframe.iloc[0][\"VGG-Face_cosine\"]}')\n",
|
||||||
|
"\n",
|
||||||
|
"# uuid_path.unlink()"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"Stream"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"DeepFace.stream(db_path=\"faces\")"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": ".venv",
|
||||||
|
"language": "python",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"codemirror_mode": {
|
||||||
|
"name": "ipython",
|
||||||
|
"version": 3
|
||||||
|
},
|
||||||
|
"file_extension": ".py",
|
||||||
|
"mimetype": "text/x-python",
|
||||||
|
"name": "python",
|
||||||
|
"nbconvert_exporter": "python",
|
||||||
|
"pygments_lexer": "ipython3",
|
||||||
|
"version": "3.10.5"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 2
|
||||||
|
}
|
|
@ -6,19 +6,21 @@ services:
|
||||||
container_name: bridge-wyze
|
container_name: bridge-wyze
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
image: mrlt8/wyze-bridge:latest
|
image: mrlt8/wyze-bridge:latest
|
||||||
ports:
|
# I think we can remove the ports, since we're using the network
|
||||||
- 1935:1935 # RTMP
|
# Just an unnecesary security risk
|
||||||
- 8554:8554 # RTSP
|
# ports:
|
||||||
- 8888:8888 # HLS
|
# - 1935:1935 # RTMP
|
||||||
- 5000:5000 # WEB-UI
|
# - 8554:8554 # RTSP (this is really the only one we need)
|
||||||
|
# - 8888:8888 # HLS
|
||||||
|
# - 5000:5000 # WEB-UI
|
||||||
environment:
|
environment:
|
||||||
- WYZE_EMAIL=${WYZE_EMAIL} # Replace with wyze email
|
- WYZE_EMAIL=${WYZE_EMAIL} # Replace with wyze email
|
||||||
- WYZE_PASSWORD=${WYZE_PASSWORD} # Replace with wyze password
|
- WYZE_PASSWORD=${WYZE_PASSWORD} # Replace with wyze password
|
||||||
networks:
|
networks:
|
||||||
all:
|
all:
|
||||||
aliases:
|
# aliases:
|
||||||
- bridge
|
# - bridge
|
||||||
- wyze-bridge
|
# - wyze-bridge
|
||||||
ntfy:
|
ntfy:
|
||||||
image: binwiederhier/ntfy
|
image: binwiederhier/ntfy
|
||||||
container_name: ntfy-wyze
|
container_name: ntfy-wyze
|
||||||
|
@ -36,29 +38,34 @@ services:
|
||||||
all:
|
all:
|
||||||
facial_recognition:
|
facial_recognition:
|
||||||
container_name: face-recognition-wyze
|
container_name: face-recognition-wyze
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
image: ghcr.io/slashtechno/wyze_face_recognition:latest
|
# image: ghcr.io/slashtechno/wyze_face_recognition:latest
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: Dockerfile
|
||||||
volumes:
|
volumes:
|
||||||
# ./config is mounted as /app/config
|
- ./faces:/app/faces
|
||||||
- ./config:/app/config
|
|
||||||
networks:
|
networks:
|
||||||
all:
|
all:
|
||||||
environment:
|
environment:
|
||||||
- RUN_BY_COMPOSE=true
|
- URL=rtsp://bridge:8554/cv
|
||||||
|
- NO_DISPLAY=true
|
||||||
|
- NTFY_URL=http://ntfy:80/set-detect-notify
|
||||||
depends_on:
|
depends_on:
|
||||||
- bridge
|
- bridge
|
||||||
|
|
||||||
# Use curl to check if the rtsp stream is up, then run the face recognition
|
# Use curl to check if the rtsp stream is up, then run the face recognition
|
||||||
command: >
|
# command: >
|
||||||
/bin/sh -c "
|
# /bin/sh -c "
|
||||||
while true; do
|
# while true; do
|
||||||
curl -s http://bridge:8888/cv/0.m3u8 > /dev/null
|
# curl -s http://bridge:8888/cv/0.m3u8 > /dev/null
|
||||||
if [ $? -eq 0 ]; then
|
# if [ $? -eq 0 ]; then
|
||||||
echo 'Stream is up, running face recognition'
|
# echo 'Stream is up, running face recognition'
|
||||||
python3 /app/main.py
|
# python3 /app/main.py
|
||||||
else
|
# else
|
||||||
echo 'Stream is down, waiting 5 seconds'
|
# echo 'Stream is down, waiting 5 seconds'
|
||||||
sleep 5
|
# sleep 5
|
||||||
fi
|
# fi
|
||||||
done
|
# done
|
||||||
"
|
# "
|
||||||
tty: true
|
tty: true
|
BIN
environment.yml
BIN
environment.yml
Binary file not shown.
209
main.py
209
main.py
|
@ -1,209 +0,0 @@
|
||||||
import datetime
|
|
||||||
import face_recognition
|
|
||||||
import cv2
|
|
||||||
import numpy as np
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
import os
|
|
||||||
import json
|
|
||||||
import pathlib
|
|
||||||
import requests
|
|
||||||
import time
|
|
||||||
|
|
||||||
|
|
||||||
load_dotenv()
|
|
||||||
URL = os.getenv("URL")
|
|
||||||
RUN_SCALE = os.getenv("RUN_SCALE")
|
|
||||||
VIEW_SCALE = os.getenv("VIEW_SCALE")
|
|
||||||
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
|
|
||||||
# RUN_SCALE = 0.25
|
|
||||||
# VIEW_SCALE = 0.75
|
|
||||||
DISPLAY = False
|
|
||||||
RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE")
|
|
||||||
NTFY_URL = os.getenv("NTFY_URL")
|
|
||||||
|
|
||||||
|
|
||||||
def find_face_from_name(name):
|
|
||||||
for face in config["faces"]:
|
|
||||||
if config["faces"][face]["name"] == name:
|
|
||||||
return face
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def write_config():
|
|
||||||
with open(config_path, "w") as config_file:
|
|
||||||
json.dump(config, config_file, indent=4)
|
|
||||||
|
|
||||||
|
|
||||||
print("Hello, world!")
|
|
||||||
|
|
||||||
# Initialize some variables
|
|
||||||
face_locations = []
|
|
||||||
face_encodings = []
|
|
||||||
face_names = []
|
|
||||||
known_face_encodings = []
|
|
||||||
known_face_names = []
|
|
||||||
process_this_frame = True
|
|
||||||
|
|
||||||
# Load the config file, if it does not exist or is blank, create it
|
|
||||||
config = {
|
|
||||||
# If RUN_BY_COMPOSE is true, set url to rtsp://wyze-bridge:8554/wyze_cam_name, otherwise set it to "rtsp://localhost:8554/wyze_cam_name"
|
|
||||||
"URL": "rtsp://localhost:8554/wyze_cam_name"
|
|
||||||
if not RUN_BY_COMPOSE
|
|
||||||
else "rtsp://bridge:8554/wyze_cam_name",
|
|
||||||
"run_scale": "0.25",
|
|
||||||
"view_scale": "0.75",
|
|
||||||
"faces": {
|
|
||||||
"example1": {"image": "config/example1.jpg", "last_seen": ""},
|
|
||||||
"example2": {"image": "config/example2.jpg", "last_seen": ""},
|
|
||||||
},
|
|
||||||
"ntfy_url": "https://ntfy.sh/example",
|
|
||||||
"display": True,
|
|
||||||
}
|
|
||||||
config_path = pathlib.Path("config/config.json")
|
|
||||||
if config_path.exists():
|
|
||||||
with open(config_path, "r") as config_file:
|
|
||||||
config = json.load(config_file)
|
|
||||||
else:
|
|
||||||
with open(config_path, "w") as config_file:
|
|
||||||
json.dump(config, config_file, indent=4)
|
|
||||||
print("Config file created, please edit it and restart the program")
|
|
||||||
print("For relative paths, use the format config/example.jpg")
|
|
||||||
exit()
|
|
||||||
|
|
||||||
|
|
||||||
if URL:
|
|
||||||
config["URL"] = URL
|
|
||||||
else:
|
|
||||||
URL = config["URL"]
|
|
||||||
if RUN_SCALE:
|
|
||||||
config["RUN_SCALE"] = RUN_SCALE
|
|
||||||
else:
|
|
||||||
RUN_SCALE = float(config["RUN_SCALE"])
|
|
||||||
if VIEW_SCALE:
|
|
||||||
config["VIEW_SCALE"] = VIEW_SCALE
|
|
||||||
else:
|
|
||||||
VIEW_SCALE = float(config["VIEW_SCALE"])
|
|
||||||
if DISPLAY:
|
|
||||||
config["DISPLAY"] = DISPLAY
|
|
||||||
else:
|
|
||||||
DISPLAY = config["display"]
|
|
||||||
if NTFY_URL:
|
|
||||||
config["ntfy_url"] = NTFY_URL
|
|
||||||
else:
|
|
||||||
NTFY_URL = config["ntfy_url"]
|
|
||||||
print(f"Current config: {config}")
|
|
||||||
|
|
||||||
for face in config["faces"]:
|
|
||||||
# Load a sample picture and learn how to recognize it.
|
|
||||||
image = face_recognition.load_image_file(config["faces"][face]["image"])
|
|
||||||
face_encoding = face_recognition.face_encodings(image)[0]
|
|
||||||
known_face_encodings.append(face_encoding)
|
|
||||||
# Append the key to the list of known face names
|
|
||||||
known_face_names.append(face)
|
|
||||||
|
|
||||||
video_capture = cv2.VideoCapture(URL)
|
|
||||||
# Eliminate lag by setting the buffer size to 1
|
|
||||||
# This makes it so that the video capture will only grab the most recent frame
|
|
||||||
# However, this means that the video may be choppy
|
|
||||||
video_capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
||||||
|
|
||||||
# Print the resolution of the video
|
|
||||||
print(
|
|
||||||
f"Video resolution: {video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)}x{video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
print("Beginning video capture...")
|
|
||||||
while True:
|
|
||||||
# Grab a single frame of video
|
|
||||||
ret, frame = video_capture.read()
|
|
||||||
# Only process every other frame of video to save time
|
|
||||||
# Resize frame of video to a smaller size for faster face recognition processing
|
|
||||||
run_frame = cv2.resize(frame, (0, 0), fx=RUN_SCALE, fy=RUN_SCALE)
|
|
||||||
view_frame = cv2.resize(frame, (0, 0), fx=VIEW_SCALE, fy=VIEW_SCALE)
|
|
||||||
# Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses)
|
|
||||||
rgb_run_frame = run_frame[:, :, ::-1]
|
|
||||||
# Find all the faces and face encodings in the current frame of video
|
|
||||||
# model cnn is gpu accelerated, but hog is cpu only
|
|
||||||
face_locations = face_recognition.face_locations(
|
|
||||||
rgb_run_frame, model="hog"
|
|
||||||
) # This crashes the program without output on my laptop when it's running without Docker compose
|
|
||||||
face_encodings = face_recognition.face_encodings(rgb_run_frame, face_locations)
|
|
||||||
face_names = []
|
|
||||||
for face_encoding in face_encodings:
|
|
||||||
# See if the face is a match for the known face(s)
|
|
||||||
matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
|
|
||||||
name = "Unknown"
|
|
||||||
# Or instead, use the known face with the smallest distance to the new face
|
|
||||||
face_distances = face_recognition.face_distance(
|
|
||||||
known_face_encodings, face_encoding
|
|
||||||
)
|
|
||||||
best_match_index = np.argmin(face_distances)
|
|
||||||
if matches[best_match_index]:
|
|
||||||
name = known_face_names[best_match_index]
|
|
||||||
last_seen = config["faces"][name]["last_seen"]
|
|
||||||
# If it's never been seen, set the last seen time to x+5 seconds ago so it will be seen
|
|
||||||
# Kind of a hacky way to do it, but it works... hopefully
|
|
||||||
if last_seen == "":
|
|
||||||
print(f"{name} has been seen for the first time")
|
|
||||||
config["faces"][name]["last_seen"] = (
|
|
||||||
datetime.datetime.now() - datetime.timedelta(seconds=15)
|
|
||||||
).strftime(DATETIME_FORMAT)
|
|
||||||
write_config()
|
|
||||||
# Check if the face has been seen in the last 5 seconds
|
|
||||||
if datetime.datetime.now() - datetime.datetime.strptime(
|
|
||||||
last_seen, DATETIME_FORMAT
|
|
||||||
) > datetime.timedelta(seconds=10):
|
|
||||||
print(f"{name} has been seen")
|
|
||||||
# Send a notification
|
|
||||||
print(f"Sending notification to{NTFY_URL}")
|
|
||||||
requests.post(
|
|
||||||
NTFY_URL,
|
|
||||||
data=f'"{name}" has been seen',
|
|
||||||
headers={
|
|
||||||
"Title": "Face Detected",
|
|
||||||
"Priority": "default",
|
|
||||||
"Tags": "neutral_face",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
# Update the last seen time
|
|
||||||
config["faces"][name]["last_seen"] = datetime.datetime.now().strftime(
|
|
||||||
DATETIME_FORMAT
|
|
||||||
)
|
|
||||||
# print("Writing config...")
|
|
||||||
write_config()
|
|
||||||
face_names.append(name)
|
|
||||||
# Display the results
|
|
||||||
# Iterate over each face found in the frame to draw a box around it
|
|
||||||
# Zip is used to iterate over two lists at the same time
|
|
||||||
for (top, right, bottom, left), name in zip(face_locations, face_names):
|
|
||||||
# print(f"Face found at {top}, {right}, {bottom}, {left} with name {name}")
|
|
||||||
# Scale back up face locations since the frame we detected in was scaled to 1/4 size
|
|
||||||
top = int(top * (VIEW_SCALE / RUN_SCALE))
|
|
||||||
right = int(right * (VIEW_SCALE / RUN_SCALE))
|
|
||||||
bottom = int(bottom * (VIEW_SCALE / RUN_SCALE))
|
|
||||||
left = int(left * (VIEW_SCALE / RUN_SCALE))
|
|
||||||
|
|
||||||
# Draw a box around the face
|
|
||||||
cv2.rectangle(view_frame, (left, top), (right, bottom), (0, 0, 255), 2)
|
|
||||||
|
|
||||||
# Draw a label with a name below the face
|
|
||||||
cv2.rectangle(
|
|
||||||
view_frame, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED
|
|
||||||
)
|
|
||||||
font = cv2.FONT_HERSHEY_DUPLEX
|
|
||||||
cv2.putText(
|
|
||||||
view_frame, name, (left + 6, bottom - 6), font, 1.0, (255, 255, 255), 1
|
|
||||||
)
|
|
||||||
|
|
||||||
# Display the resulting image if DISPLAY is set to true
|
|
||||||
if config["display"]:
|
|
||||||
cv2.imshow("Scaled View", view_frame)
|
|
||||||
|
|
||||||
# Hit 'q' on the keyboard to quit!
|
|
||||||
if cv2.waitKey(1) & 0xFF == ord("q"):
|
|
||||||
break
|
|
||||||
|
|
||||||
# Release handle to the webcam
|
|
||||||
print("Releasing video capture")
|
|
||||||
video_capture.release()
|
|
||||||
cv2.destroyAllWindows()
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,46 @@
|
||||||
|
[tool.poetry]
|
||||||
|
name = "set_detect_notify"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "Detect all the things"
|
||||||
|
authors = ["slashtechno <77907286+slashtechno@users.noreply.github.com>"]
|
||||||
|
license = "MIT"
|
||||||
|
readme = "README.md"
|
||||||
|
packages = [{include = "set_detect_notify"}]
|
||||||
|
|
||||||
|
[tool.poetry.dependencies]
|
||||||
|
# python = "^3.10"
|
||||||
|
python = ">=3.10, <3.12"
|
||||||
|
python-dotenv = "^1.0.0"
|
||||||
|
httpx = "^0.25.0"
|
||||||
|
opencv-python = "^4.8.1.78"
|
||||||
|
ultralytics = "^8.0.190"
|
||||||
|
hjson = "^3.1.0"
|
||||||
|
numpy = "^1.23.2"
|
||||||
|
|
||||||
|
# https://github.com/python-poetry/poetry/issues/6409
|
||||||
|
torch = ">=2.0.0, !=2.0.1, !=2.1.0"
|
||||||
|
|
||||||
|
tensorflow-io-gcs-filesystem = "0.31.0"
|
||||||
|
deepface = "^0.0.79"
|
||||||
|
|
||||||
|
|
||||||
|
[tool.poetry.group.dev.dependencies]
|
||||||
|
black = "^23.9.1"
|
||||||
|
ruff = "^0.0.291"
|
||||||
|
ipykernel = "^6.25.2"
|
||||||
|
nbconvert = "^7.9.2"
|
||||||
|
|
||||||
|
|
||||||
|
[build-system]
|
||||||
|
requires = ["poetry-core"]
|
||||||
|
build-backend = "poetry.core.masonry.api"
|
||||||
|
|
||||||
|
|
||||||
|
[tool.ruff]
|
||||||
|
# More than the default (88) of `black` to make comments less of a headache
|
||||||
|
# Where possible, `black` will attempt to format to 88 characters
|
||||||
|
# However, setting ruff to 135 will allow for longer lines that can't be auto-formatted
|
||||||
|
line-length = 135
|
||||||
|
|
||||||
|
[tool.poetry.scripts]
|
||||||
|
set-detect-notify = "set_detect_notify.__main__:main"
|
|
@ -1,11 +0,0 @@
|
||||||
# certifi @ file:///croot/certifi_1665076670883/work/certifi
|
|
||||||
click==8.1.3
|
|
||||||
dlib==19.24.0
|
|
||||||
face-recognition==1.3.0
|
|
||||||
face-recognition-models==0.3.0
|
|
||||||
numpy==1.23.5
|
|
||||||
opencv-python==4.6.0.66
|
|
||||||
Pillow==10.0.1
|
|
||||||
python-dotenv==0.21.0
|
|
||||||
urllib3==1.26.13
|
|
||||||
requests==2.31.0
|
|
|
@ -0,0 +1,312 @@
|
||||||
|
# import face_recognition
|
||||||
|
import cv2
|
||||||
|
import dotenv
|
||||||
|
from pathlib import Path
|
||||||
|
import os
|
||||||
|
|
||||||
|
# import hjson as json
|
||||||
|
import torch
|
||||||
|
from ultralytics import YOLO
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
from .utils import notify
|
||||||
|
from .utils import utils
|
||||||
|
|
||||||
|
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||||||
|
args = None
|
||||||
|
|
||||||
|
objects_and_peoples = {
|
||||||
|
"objects": {},
|
||||||
|
"peoples": {},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
global objects_and_peoples
|
||||||
|
global args
|
||||||
|
# RUN_BY_COMPOSE = os.getenv("RUN_BY_COMPOSE") # Replace this with code to check for gpu
|
||||||
|
|
||||||
|
if Path(".env").is_file():
|
||||||
|
dotenv.load_dotenv()
|
||||||
|
print("Loaded .env file")
|
||||||
|
else:
|
||||||
|
print("No .env file found")
|
||||||
|
|
||||||
|
argparser = argparse.ArgumentParser(
|
||||||
|
prog="Detect It",
|
||||||
|
description="Detect it all!",
|
||||||
|
epilog=":)",
|
||||||
|
)
|
||||||
|
|
||||||
|
# required='RUN_SCALE' not in os.environ,
|
||||||
|
|
||||||
|
argparser.add_argument(
|
||||||
|
"--run-scale",
|
||||||
|
# Set it to the env RUN_SCALE if it isn't blank, otherwise set it to 0.25
|
||||||
|
default=os.environ["RUN_SCALE"]
|
||||||
|
if "RUN_SCALE" in os.environ and os.environ["RUN_SCALE"] != ""
|
||||||
|
# else 0.25,
|
||||||
|
else 1,
|
||||||
|
type=float,
|
||||||
|
help="The scale to run the detection at, default is 0.25",
|
||||||
|
)
|
||||||
|
argparser.add_argument(
|
||||||
|
"--view-scale",
|
||||||
|
# Set it to the env VIEW_SCALE if it isn't blank, otherwise set it to 0.75
|
||||||
|
default=os.environ["VIEW_SCALE"]
|
||||||
|
if "VIEW_SCALE" in os.environ and os.environ["VIEW_SCALE"] != ""
|
||||||
|
# else 0.75,
|
||||||
|
else 1,
|
||||||
|
type=float,
|
||||||
|
help="The scale to view the detection at, default is 0.75",
|
||||||
|
)
|
||||||
|
|
||||||
|
argparser.add_argument(
|
||||||
|
"--no-display",
|
||||||
|
default=os.environ["NO_DISPLAY"]
|
||||||
|
if "NO_DISPLAY" in os.environ and os.environ["NO_DISPLAY"] != ""
|
||||||
|
else False,
|
||||||
|
action="store_true",
|
||||||
|
help="Don't display the video feed",
|
||||||
|
)
|
||||||
|
|
||||||
|
argparser.add_argument(
|
||||||
|
"--confidence-threshold",
|
||||||
|
default=os.environ["CONFIDENCE_THRESHOLD"]
|
||||||
|
if "CONFIDENCE_THRESHOLD" in os.environ
|
||||||
|
and os.environ["CONFIDENCE_THRESHOLD"] != ""
|
||||||
|
else 0.6,
|
||||||
|
type=float,
|
||||||
|
help="The confidence threshold to use",
|
||||||
|
)
|
||||||
|
|
||||||
|
argparser.add_argument(
|
||||||
|
"--faces-directory",
|
||||||
|
default=os.environ["FACES_DIRECTORY"]
|
||||||
|
if "FACES_DIRECTORY" in os.environ and os.environ["FACES_DIRECTORY"] != ""
|
||||||
|
else "faces",
|
||||||
|
type=str,
|
||||||
|
help="The directory to store the faces. Should contain 1 subdirectory of images per person",
|
||||||
|
)
|
||||||
|
argparser.add_argument(
|
||||||
|
"--detect-object",
|
||||||
|
nargs="*",
|
||||||
|
default=[],
|
||||||
|
type=str,
|
||||||
|
help="The object(s) to detect. Must be something the model is trained to detect",
|
||||||
|
)
|
||||||
|
|
||||||
|
stream_source = argparser.add_mutually_exclusive_group()
|
||||||
|
stream_source.add_argument(
|
||||||
|
"--url",
|
||||||
|
default=os.environ["URL"]
|
||||||
|
if "URL" in os.environ and os.environ["URL"] != ""
|
||||||
|
else None, # noqa: E501
|
||||||
|
type=str,
|
||||||
|
help="The URL of the stream to use",
|
||||||
|
)
|
||||||
|
stream_source.add_argument(
|
||||||
|
"--capture-device",
|
||||||
|
default=os.environ["CAPTURE_DEVICE"]
|
||||||
|
if "CAPTURE_DEVICE" in os.environ and os.environ["CAPTURE_DEVICE"] != ""
|
||||||
|
else 0, # noqa: E501
|
||||||
|
type=int,
|
||||||
|
help="The capture device to use. Can also be a url.",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Defaults for the stuff here and down are already set in notify.py.
|
||||||
|
# Setting them here just means that argparse will display the default values as defualt
|
||||||
|
# TODO: Perhaps just remove the default parameter and just add to the help message that the default is set is x
|
||||||
|
|
||||||
|
notifcation_services = argparser.add_argument_group("Notification Services")
|
||||||
|
notifcation_services.add_argument(
|
||||||
|
"--ntfy-url",
|
||||||
|
default=os.environ["NTFY_URL"]
|
||||||
|
if "NTFY_URL" in os.environ and os.environ["NTFY_URL"] != ""
|
||||||
|
else "https://ntfy.sh/set-detect-notify",
|
||||||
|
type=str,
|
||||||
|
help="The URL to send notifications to",
|
||||||
|
)
|
||||||
|
|
||||||
|
timers = argparser.add_argument_group("Timers")
|
||||||
|
timers.add_argument(
|
||||||
|
"--detection-duration",
|
||||||
|
default=os.environ["DETECTION_DURATION"]
|
||||||
|
if "DETECTION_DURATION" in os.environ and os.environ["DETECTION_DURATION"] != ""
|
||||||
|
else 2,
|
||||||
|
type=int,
|
||||||
|
help="The duration (in seconds) that an object must be detected for before sending a notification",
|
||||||
|
)
|
||||||
|
timers.add_argument(
|
||||||
|
"--detection-window",
|
||||||
|
default=os.environ["DETECTION_WINDOW"]
|
||||||
|
if "DETECTION_WINDOW" in os.environ and os.environ["DETECTION_WINDOW"] != ""
|
||||||
|
else 15,
|
||||||
|
type=int,
|
||||||
|
help="The time (seconds) before the detection duration resets",
|
||||||
|
)
|
||||||
|
timers.add_argument(
|
||||||
|
"--notification-window",
|
||||||
|
default=os.environ["NOTIFICATION_WINDOW"]
|
||||||
|
if "NOTIFICATION_WINDOW" in os.environ
|
||||||
|
and os.environ["NOTIFICATION_WINDOW"] != ""
|
||||||
|
else 30,
|
||||||
|
type=int,
|
||||||
|
help="The time (seconds) before another notification can be sent",
|
||||||
|
)
|
||||||
|
|
||||||
|
args = argparser.parse_args()
|
||||||
|
|
||||||
|
# Check if a CUDA GPU is available. If it is, set it via torch. Ff not, set it to cpu
|
||||||
|
# https://github.com/ultralytics/ultralytics/issues/3084#issuecomment-1732433168
|
||||||
|
# Currently, I have been unable to set up Poetry to use GPU for Torch
|
||||||
|
for i in range(torch.cuda.device_count()):
|
||||||
|
print(torch.cuda.get_device_properties(i).name)
|
||||||
|
if torch.cuda.is_available():
|
||||||
|
torch.cuda.set_device(0)
|
||||||
|
print("Set CUDA device")
|
||||||
|
else:
|
||||||
|
print("No CUDA device available, using CPU")
|
||||||
|
|
||||||
|
model = YOLO("yolov8n.pt")
|
||||||
|
|
||||||
|
# Depending on if the user wants to use a stream or a capture device,
|
||||||
|
# Set the video capture to the appropriate source
|
||||||
|
if args.url:
|
||||||
|
video_capture = cv2.VideoCapture(args.url)
|
||||||
|
else:
|
||||||
|
video_capture = cv2.VideoCapture(args.capture_device)
|
||||||
|
|
||||||
|
# Eliminate lag by setting the buffer size to 1
|
||||||
|
# This makes it so that the video capture will only grab the most recent frame
|
||||||
|
# However, this means that the video may be choppy
|
||||||
|
video_capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
||||||
|
|
||||||
|
# Print the resolution of the video
|
||||||
|
print(
|
||||||
|
f"Video resolution: {video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)}x{video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)}" # noqa: E501
|
||||||
|
)
|
||||||
|
|
||||||
|
print("Beginning video capture...")
|
||||||
|
while True:
|
||||||
|
# Grab a single frame of video
|
||||||
|
ret, frame = video_capture.read()
|
||||||
|
# Only process every other frame of video to save time
|
||||||
|
# Resize frame of video to a smaller size for faster recognition processing
|
||||||
|
run_frame = cv2.resize(frame, (0, 0), fx=args.run_scale, fy=args.run_scale)
|
||||||
|
# view_frame = cv2.resize(frame, (0, 0), fx=args.view_scale, fy=args.view_scale)
|
||||||
|
|
||||||
|
results = model(run_frame, verbose=False)
|
||||||
|
for i, r in enumerate(results):
|
||||||
|
# list of dicts with each dict containing a label, x1, y1, x2, y2
|
||||||
|
plot_boxes = []
|
||||||
|
|
||||||
|
# The following is stuff for people
|
||||||
|
# This is still in the for loop as each result, no matter if anything is detected, will be present.
|
||||||
|
# Thus, there will always be one result (r)
|
||||||
|
if face_details := utils.recognize_face(
|
||||||
|
path_to_directory=Path(args.faces_directory), run_frame=run_frame
|
||||||
|
):
|
||||||
|
plot_boxes.append(face_details)
|
||||||
|
objects_and_peoples = notify.thing_detected(
|
||||||
|
thing_name=face_details["label"],
|
||||||
|
objects_and_peoples=objects_and_peoples,
|
||||||
|
detection_type="peoples",
|
||||||
|
detection_window=args.detection_window,
|
||||||
|
detection_duration=args.detection_duration,
|
||||||
|
notification_window=args.notification_window,
|
||||||
|
ntfy_url=args.ntfy_url,
|
||||||
|
)
|
||||||
|
|
||||||
|
# The following is stuff for objects
|
||||||
|
# Setup dictionary of object names
|
||||||
|
if (
|
||||||
|
objects_and_peoples["objects"] == {}
|
||||||
|
or objects_and_peoples["objects"] is None
|
||||||
|
):
|
||||||
|
for name in r.names.values():
|
||||||
|
objects_and_peoples["objects"][name] = {
|
||||||
|
"last_detection_time": None,
|
||||||
|
"detection_duration": None,
|
||||||
|
# "first_detection_time": None,
|
||||||
|
"last_notification_time": None,
|
||||||
|
}
|
||||||
|
# Also, make sure that the objects to detect are in the list of objects_and_peoples
|
||||||
|
# If it isn't, print a warning
|
||||||
|
for obj in args.detect_object:
|
||||||
|
if obj not in objects_and_peoples:
|
||||||
|
print(
|
||||||
|
f"Warning: {obj} is not in the list of objects the model can detect!"
|
||||||
|
)
|
||||||
|
|
||||||
|
for box in r.boxes:
|
||||||
|
# Get the name of the object
|
||||||
|
class_id = r.names[box.cls[0].item()]
|
||||||
|
# Get the coordinates of the object
|
||||||
|
cords = box.xyxy[0].tolist()
|
||||||
|
cords = [round(x) for x in cords]
|
||||||
|
# Get the confidence
|
||||||
|
conf = round(box.conf[0].item(), 2)
|
||||||
|
# Print it out, adding a spacer between each object
|
||||||
|
# print("Object type:", class_id)
|
||||||
|
# print("Coordinates:", cords)
|
||||||
|
# print("Probability:", conf)
|
||||||
|
# print("---")
|
||||||
|
|
||||||
|
# Now do stuff (if conf > 0.5)
|
||||||
|
if conf < args.confidence_threshold or (
|
||||||
|
class_id not in args.detect_object and args.detect_object != []
|
||||||
|
):
|
||||||
|
# If the confidence is too low
|
||||||
|
# or if the object is not in the list of objects to detect and the list of objects to detect is not empty
|
||||||
|
# then skip this iteration
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Add the object to the list of objects to plot
|
||||||
|
plot_boxes.append(
|
||||||
|
{
|
||||||
|
"label": class_id,
|
||||||
|
"x1": cords[0],
|
||||||
|
"y1": cords[1],
|
||||||
|
"x2": cords[2],
|
||||||
|
"y2": cords[3],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
objects_and_peoples = notify.thing_detected(
|
||||||
|
thing_name=class_id,
|
||||||
|
objects_and_peoples=objects_and_peoples,
|
||||||
|
detection_type="objects",
|
||||||
|
detection_window=args.detection_window,
|
||||||
|
detection_duration=args.detection_duration,
|
||||||
|
notification_window=args.notification_window,
|
||||||
|
ntfy_url=args.ntfy_url,
|
||||||
|
)
|
||||||
|
|
||||||
|
# To debug plotting, use r.plot() to cross reference the bounding boxes drawn by the plot_label() and r.plot()
|
||||||
|
frame_to_show = utils.plot_label(
|
||||||
|
boxes=plot_boxes,
|
||||||
|
full_frame=frame,
|
||||||
|
# full_frame=r.plot(),
|
||||||
|
run_scale=args.run_scale,
|
||||||
|
view_scale=args.view_scale,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Display the resulting frame
|
||||||
|
# cv2.imshow("", r)
|
||||||
|
if not args.no_display:
|
||||||
|
cv2.imshow(f"Video{i}", frame_to_show)
|
||||||
|
|
||||||
|
# Hit 'q' on the keyboard to quit!
|
||||||
|
if cv2.waitKey(1) & 0xFF == ord("q"):
|
||||||
|
break
|
||||||
|
|
||||||
|
# Release handle to the webcam
|
||||||
|
print("Releasing video capture")
|
||||||
|
video_capture.release()
|
||||||
|
cv2.destroyAllWindows()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
|
@ -0,0 +1,137 @@
|
||||||
|
import httpx
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
Structure of objects_and_peoples
|
||||||
|
Really, the only reason peoples is a separate dictionary is to prevent duplicates, though it just makes the code more complicated.
|
||||||
|
{
|
||||||
|
"objects": {
|
||||||
|
"object_name": {
|
||||||
|
"last_detection_time": float,
|
||||||
|
"detection_duration": float,
|
||||||
|
"last_notification_time": float,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"peoples": {
|
||||||
|
"person_name": {
|
||||||
|
"last_detection_time": float,
|
||||||
|
"detection_duration": float,
|
||||||
|
"last_notification_time": float,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
# objects_and_peoples = {}
|
||||||
|
|
||||||
|
|
||||||
|
def thing_detected(
|
||||||
|
thing_name: str,
|
||||||
|
objects_and_peoples: dict,
|
||||||
|
detection_type: str = "objects",
|
||||||
|
detection_window: int = 15,
|
||||||
|
detection_duration: int = 2,
|
||||||
|
notification_window: int = 15,
|
||||||
|
ntfy_url: str = "https://ntfy.sh/set-detect-notify",
|
||||||
|
) -> dict:
|
||||||
|
"""
|
||||||
|
A function to make sure 2 seconds of detection is detected in 15 seconds, 15 seconds apart.
|
||||||
|
Takes a dict that will be retured with the updated detection times. MAKE SURE TO SAVE THE RETURNED DICTIONARY
|
||||||
|
"""
|
||||||
|
|
||||||
|
# "Alias" the objects and peoples dictionaries so it's easier to work with
|
||||||
|
respective_type = objects_and_peoples[detection_type]
|
||||||
|
|
||||||
|
# (re)start cycle
|
||||||
|
try:
|
||||||
|
if (
|
||||||
|
# If the object has not been detected before
|
||||||
|
respective_type[thing_name]["last_detection_time"] is None
|
||||||
|
# If the last detection was more than 15 seconds ago
|
||||||
|
or time.time() - respective_type[thing_name]["last_detection_time"]
|
||||||
|
> detection_window
|
||||||
|
):
|
||||||
|
# Set the last detection time to now
|
||||||
|
respective_type[thing_name]["last_detection_time"] = time.time()
|
||||||
|
print(f"First detection of {thing_name} in this detection window")
|
||||||
|
# This line is important. It resets the detection duration when the object hasn't been detected for a while
|
||||||
|
# If detection duration is None, don't print anything.
|
||||||
|
# Otherwise, print that the detection duration is being reset due to inactivity
|
||||||
|
if respective_type[thing_name]["detection_duration"] is not None:
|
||||||
|
print(
|
||||||
|
f"Resetting detection duration for {thing_name} since it hasn't been detected for {detection_window} seconds" # noqa: E501
|
||||||
|
)
|
||||||
|
respective_type[thing_name]["detection_duration"] = 0
|
||||||
|
else:
|
||||||
|
# Check if the last NOTIFICATION was less than 15 seconds ago
|
||||||
|
# If it was, then don't do anything
|
||||||
|
if (
|
||||||
|
time.time() - respective_type[thing_name]["last_detection_time"]
|
||||||
|
<= notification_window
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
# If it was more than 15 seconds ago, reset the detection duration
|
||||||
|
# This effectively resets the notification timer
|
||||||
|
else:
|
||||||
|
print("Notification timer has expired - resetting")
|
||||||
|
respective_type[thing_name]["detection_duration"] = 0
|
||||||
|
respective_type[thing_name]["detection_duration"] += (
|
||||||
|
time.time() - respective_type[thing_name]["last_detection_time"]
|
||||||
|
)
|
||||||
|
# print("Updating detection duration")
|
||||||
|
respective_type[thing_name]["last_detection_time"] = time.time()
|
||||||
|
except KeyError:
|
||||||
|
# If the object has not been detected before
|
||||||
|
respective_type[thing_name] = {
|
||||||
|
"last_detection_time": time.time(),
|
||||||
|
"detection_duration": 0,
|
||||||
|
"last_notification_time": None,
|
||||||
|
}
|
||||||
|
print(f"First detection of {thing_name} ever")
|
||||||
|
|
||||||
|
# (re)send notification
|
||||||
|
# Check if detection has been ongoing for 2 seconds or more in the past 15 seconds
|
||||||
|
if (
|
||||||
|
respective_type[thing_name]["detection_duration"] >= detection_duration
|
||||||
|
and time.time() - respective_type[thing_name]["last_detection_time"]
|
||||||
|
<= detection_window
|
||||||
|
):
|
||||||
|
# If the last notification was more than 15 seconds ago, then send a notification
|
||||||
|
if (
|
||||||
|
respective_type[thing_name]["last_notification_time"] is None
|
||||||
|
or time.time() - respective_type[thing_name]["last_notification_time"]
|
||||||
|
> notification_window
|
||||||
|
):
|
||||||
|
respective_type[thing_name]["last_notification_time"] = time.time()
|
||||||
|
print(f"Detected {thing_name} for {detection_duration} seconds")
|
||||||
|
headers = construct_ntfy_headers(
|
||||||
|
title=f"{thing_name} detected",
|
||||||
|
tag="rotating_light",
|
||||||
|
priority="default",
|
||||||
|
)
|
||||||
|
send_notification(
|
||||||
|
data=f"{thing_name} detected for {detection_duration} seconds",
|
||||||
|
headers=headers,
|
||||||
|
url=ntfy_url,
|
||||||
|
)
|
||||||
|
# Reset the detection duration
|
||||||
|
print("Just sent a notification - resetting detection duration")
|
||||||
|
respective_type[thing_name]["detection_duration"] = 0
|
||||||
|
|
||||||
|
# Take the aliased objects_and_peoples and update the respective dictionary
|
||||||
|
objects_and_peoples[detection_type] = respective_type
|
||||||
|
return objects_and_peoples
|
||||||
|
|
||||||
|
|
||||||
|
def construct_ntfy_headers(
|
||||||
|
title: str = "Object/Person Detected",
|
||||||
|
tag="rotating_light", # https://docs.ntfy.sh/publish/#tags-emojis
|
||||||
|
priority="default", # https://docs.ntfy.sh/publish/#message-priority
|
||||||
|
) -> dict:
|
||||||
|
return {"Title": title, "Priority": priority, "Tags": tag}
|
||||||
|
|
||||||
|
|
||||||
|
def send_notification(data: str, headers: dict, url: str):
|
||||||
|
if url is None or data is None:
|
||||||
|
raise ValueError("url and data cannot be None")
|
||||||
|
httpx.post(url, data=data.encode("utf-8"), headers=headers)
|
|
@ -0,0 +1,149 @@
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
from pathlib import Path
|
||||||
|
from deepface import DeepFace
|
||||||
|
|
||||||
|
first_face_try = True
|
||||||
|
|
||||||
|
|
||||||
|
def plot_label(
|
||||||
|
# list of dicts with each dict containing a label, x1, y1, x2, y2
|
||||||
|
boxes: list = None,
|
||||||
|
# opencv image
|
||||||
|
full_frame: np.ndarray = None,
|
||||||
|
# run_scale is the scale of the image that was used to run the model
|
||||||
|
# So the coordinates will be scaled up to the view frame size
|
||||||
|
run_scale: float = None,
|
||||||
|
# view_scale is the scale of the image, in relation to the full frame
|
||||||
|
# So the coordinates will be scaled appropriately when coming from run_frame
|
||||||
|
view_scale: float = None,
|
||||||
|
font: int = cv2.FONT_HERSHEY_SIMPLEX,
|
||||||
|
):
|
||||||
|
# x1 and y1 are the top left corner of the box
|
||||||
|
# x2 and y2 are the bottom right corner of the box
|
||||||
|
# Example scaling: full_frame: 1 run_frame: 0.5 view_frame: 0.25
|
||||||
|
view_frame = cv2.resize(full_frame, (0, 0), fx=view_scale, fy=view_scale)
|
||||||
|
for thing in boxes:
|
||||||
|
cv2.rectangle(
|
||||||
|
# Image
|
||||||
|
view_frame,
|
||||||
|
# Top left corner
|
||||||
|
(
|
||||||
|
int((thing["x1"] / run_scale) * view_scale),
|
||||||
|
int((thing["y1"] / run_scale) * view_scale),
|
||||||
|
),
|
||||||
|
# Bottom right corner
|
||||||
|
(
|
||||||
|
int((thing["x2"] / run_scale) * view_scale),
|
||||||
|
int((thing["y2"] / run_scale) * view_scale),
|
||||||
|
),
|
||||||
|
# Color
|
||||||
|
(0, 255, 0),
|
||||||
|
# Thickness
|
||||||
|
2,
|
||||||
|
)
|
||||||
|
cv2.putText(
|
||||||
|
# Image
|
||||||
|
view_frame,
|
||||||
|
# Text
|
||||||
|
thing["label"],
|
||||||
|
# Origin
|
||||||
|
(
|
||||||
|
int((thing["x1"] / run_scale) * view_scale),
|
||||||
|
int((thing["y1"] / run_scale) * view_scale) - 10,
|
||||||
|
),
|
||||||
|
# Font
|
||||||
|
font,
|
||||||
|
# Font Scale
|
||||||
|
1,
|
||||||
|
# Color
|
||||||
|
(0, 255, 0),
|
||||||
|
# Thickness
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
return view_frame
|
||||||
|
|
||||||
|
|
||||||
|
def recognize_face(
|
||||||
|
path_to_directory: Path = Path("faces"),
|
||||||
|
# opencv image
|
||||||
|
run_frame: np.ndarray = None,
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""
|
||||||
|
Accepts a path to a directory of images of faces to be used as a refference
|
||||||
|
In addition, accepts an opencv image to be used as the frame to be searched
|
||||||
|
|
||||||
|
Returns a single dictonary as currently only 1 face can be detected in each frame
|
||||||
|
dict contains the following keys: label, x1, y1, x2, y2
|
||||||
|
The directory should be structured as follows:
|
||||||
|
faces/
|
||||||
|
name/
|
||||||
|
image1.jpg
|
||||||
|
image2.jpg
|
||||||
|
image3.jpg
|
||||||
|
name2/
|
||||||
|
image1.jpg
|
||||||
|
image2.jpg
|
||||||
|
image3.jpg
|
||||||
|
(not neccessarily jpgs, but you get the idea)
|
||||||
|
|
||||||
|
Point is, `name` is the name of the person in the images in the directory `name`
|
||||||
|
That name will be used as the label for the face in the frame
|
||||||
|
"""
|
||||||
|
global first_face_try
|
||||||
|
|
||||||
|
# If it's the first time the function is being run, remove representations_vgg_face.pkl, if it exists
|
||||||
|
if first_face_try:
|
||||||
|
try:
|
||||||
|
Path("representations_vgg_face.pkl").unlink()
|
||||||
|
print("Removing representations_vgg_face.pkl")
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
first_face_try = False
|
||||||
|
|
||||||
|
# face_dataframes is a vanilla list of dataframes
|
||||||
|
try:
|
||||||
|
face_dataframes = DeepFace.find(
|
||||||
|
run_frame,
|
||||||
|
db_path=str(path_to_directory),
|
||||||
|
enforce_detection=True,
|
||||||
|
silent=True,
|
||||||
|
)
|
||||||
|
except ValueError as e:
|
||||||
|
if (
|
||||||
|
str(e)
|
||||||
|
== "Face could not be detected. Please confirm that the picture is a face photo or consider to set enforce_detection param to False."
|
||||||
|
):
|
||||||
|
return None
|
||||||
|
# Iteate over the dataframes
|
||||||
|
for df in face_dataframes:
|
||||||
|
# The last row is the highest confidence
|
||||||
|
# So we can just grab the path from there
|
||||||
|
# iloc = Integer LOCation
|
||||||
|
path_to_image = Path(df.iloc[-1]["identity"])
|
||||||
|
# Get the name of the parent directory
|
||||||
|
label = path_to_image.parent.name
|
||||||
|
# Return the coordinates of the box in xyxy format, rather than xywh
|
||||||
|
# This is because YOLO uses xyxy, and that's how plot_label expects
|
||||||
|
# Also, xyxy is just the top left and bottom right corners of the box
|
||||||
|
coordinates = {
|
||||||
|
"x1": df.iloc[-1]["source_x"],
|
||||||
|
"y1": df.iloc[-1]["source_y"],
|
||||||
|
"x2": df.iloc[-1]["source_x"] + df.iloc[-1]["source_w"],
|
||||||
|
"y2": df.iloc[-1]["source_y"] + df.iloc[-1]["source_h"],
|
||||||
|
}
|
||||||
|
# After some brief testing, it seems positve matches are > 0.3
|
||||||
|
# I have not seen any false positives, so there is no threashold yet
|
||||||
|
distance = df.iloc[-1]["VGG-Face_cosine"]
|
||||||
|
# if 0.5 < distance < 0.7:
|
||||||
|
# label = "Unknown"
|
||||||
|
to_return = dict(label=label, **coordinates)
|
||||||
|
print(
|
||||||
|
f"Confindence: {distance}, filname: {path_to_image.name}, to_return: {to_return}"
|
||||||
|
)
|
||||||
|
return to_return
|
||||||
|
|
||||||
|
"""
|
||||||
|
Example dataframe, for reference
|
||||||
|
identity (path to image) | source_x | source_y | source_w | source_h | VGG-Face_cosine (pretty much the confidence \_('_')_/)
|
||||||
|
"""
|
Loading…
Reference in New Issue