Added `cmd.py`

Removed dependence on `utils.py` for loading API credentials
This commit is contained in:
slashtechno 2023-07-31 22:39:12 -05:00
parent a1ca75a5dc
commit f3cf25dfd7
Signed by: slashtechno
GPG Key ID: 8EC1D9D9286C2B17
11 changed files with 220 additions and 11846 deletions

4
.gitignore vendored
View File

@ -2,3 +2,7 @@
__pycache__/ __pycache__/
tmp.txt tmp.txt
.env .env
blocklists/*
!blocklists/.gitkeep
tmp.py
.venv

14
.vscode/extensions.json vendored Normal file
View File

@ -0,0 +1,14 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=827846 to learn about workspace recommendations.
// Extension identifier format: ${publisher}.${name}. Example: vscode.csharp
// List of extensions which should be recommended for users of this workspace.
"recommendations": [
"charliermarsh.ruff"
],
// List of extensions recommended by VS Code that should not be recommended for users of this workspace.
"unwantedRecommendations": [
]
}

4
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,4 @@
{
"python.languageServer": "Pylance",
"python.analysis.ignore": [ "*" ] // Ruff is used for linting but Pylance still is useful
}

0
blocklists/.gitkeep Normal file
View File

File diff suppressed because it is too large Load Diff

71
poetry.lock generated
View File

@ -95,6 +95,17 @@ files = [
{file = "charset_normalizer-3.2.0-py3-none-any.whl", hash = "sha256:8e098148dd37b4ce3baca71fb394c81dc5d9c7728c95df695d2dca218edf40e6"}, {file = "charset_normalizer-3.2.0-py3-none-any.whl", hash = "sha256:8e098148dd37b4ce3baca71fb394c81dc5d9c7728c95df695d2dca218edf40e6"},
] ]
[[package]]
name = "colorama"
version = "0.4.6"
description = "Cross-platform colored terminal text."
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
files = [
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
]
[[package]] [[package]]
name = "idna" name = "idna"
version = "3.4" version = "3.4"
@ -107,18 +118,22 @@ files = [
] ]
[[package]] [[package]]
name = "python-dotenv" name = "loguru"
version = "1.0.0" version = "0.7.0"
description = "Read key-value pairs from a .env file and set them as environment variables" description = "Python logging made (stupidly) simple"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.5"
files = [ files = [
{file = "python-dotenv-1.0.0.tar.gz", hash = "sha256:a8df96034aae6d2d50a4ebe8216326c61c3eb64836776504fcca410e5937a3ba"}, {file = "loguru-0.7.0-py3-none-any.whl", hash = "sha256:b93aa30099fa6860d4727f1b81f8718e965bb96253fa190fab2077aaad6d15d3"},
{file = "python_dotenv-1.0.0-py3-none-any.whl", hash = "sha256:f5971a9226b701070a4bf2c38c89e5a3f0d64de8debda981d1db98583009122a"}, {file = "loguru-0.7.0.tar.gz", hash = "sha256:1612053ced6ae84d7959dd7d5e431a0532642237ec21f7fd83ac73fe539e03e1"},
] ]
[package.dependencies]
colorama = {version = ">=0.3.4", markers = "sys_platform == \"win32\""}
win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""}
[package.extras] [package.extras]
cli = ["click (>=5.0)"] dev = ["Sphinx (==5.3.0)", "colorama (==0.4.5)", "colorama (==0.4.6)", "freezegun (==1.1.0)", "freezegun (==1.2.2)", "mypy (==v0.910)", "mypy (==v0.971)", "mypy (==v0.990)", "pre-commit (==3.2.1)", "pytest (==6.1.2)", "pytest (==7.2.1)", "pytest-cov (==2.12.1)", "pytest-cov (==4.0.0)", "pytest-mypy-plugins (==1.10.1)", "pytest-mypy-plugins (==1.9.3)", "sphinx-autobuild (==2021.3.14)", "sphinx-rtd-theme (==1.2.0)", "tox (==3.27.1)", "tox (==4.4.6)"]
[[package]] [[package]]
name = "requests" name = "requests"
@ -141,6 +156,32 @@ urllib3 = ">=1.21.1,<3"
socks = ["PySocks (>=1.5.6,!=1.5.7)"] socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "ruff"
version = "0.0.281"
description = "An extremely fast Python linter, written in Rust."
optional = false
python-versions = ">=3.7"
files = [
{file = "ruff-0.0.281-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:418fbddfd3dba4d7b11e4e016eacc40d321ff0b7d3637c7ba9ad3ee0474c9a35"},
{file = "ruff-0.0.281-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:c086bf3968d5cb2b4f31a586fc73bc42cb688c32f4c992ff161d4ce19f551cf2"},
{file = "ruff-0.0.281-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0162b149a94f6007768820bcdf4ccb7e90a21655aac829ace49f4682d0565fdb"},
{file = "ruff-0.0.281-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:f3495175e6d85a01d3da409a079461a5a3c15b70237cc82550ad8c1f091002c8"},
{file = "ruff-0.0.281-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ae0b836c03a7010527bb56384a4e3718e0958e32bea64459879aacdcb65c4945"},
{file = "ruff-0.0.281-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:6d34cae6ef6c6b6fd6d4f09271fbf635db49e6b788da1b2e1dea11a29f1c2a11"},
{file = "ruff-0.0.281-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:dd3c94260a148e955fb46f41d4bcecd857c75794e9f06ebfa7f9be65cfed9621"},
{file = "ruff-0.0.281-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2ccb875a4000bcba6cc61cb9d3cd5969d6b0921b5234f0ef99ad75f74e8935ef"},
{file = "ruff-0.0.281-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7f5b8ccaabad61e2d50494df820b7bafd94eac13f10d2d8b831994c1618801a9"},
{file = "ruff-0.0.281-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:cbf279fd9c2ca674896656df2d82831010afd336a6703a060fe08d6f2358e47b"},
{file = "ruff-0.0.281-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:24d0defeb2c6a1b16a4230840d1138e08bc4ef2318496fa6ff7ddbf3a443626f"},
{file = "ruff-0.0.281-py3-none-musllinux_1_2_i686.whl", hash = "sha256:54bab7128167057ee5987bbd9f925fbf105071068de9d8474ca7c38f684b8463"},
{file = "ruff-0.0.281-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:29a22b7a6433ce0b4e601897e8a5dd58a75c75c01afee9b8922ebbdd1fe51e51"},
{file = "ruff-0.0.281-py3-none-win32.whl", hash = "sha256:7b781f6a7ed35196e6565ed32f57d07b852b0dcd7158c6c7669c8b5d0f8cf97a"},
{file = "ruff-0.0.281-py3-none-win_amd64.whl", hash = "sha256:70f921438bf09f04c0547cf64c137c87ef33cbec2b64be12b8caa87df261a016"},
{file = "ruff-0.0.281-py3-none-win_arm64.whl", hash = "sha256:42a92a62fc841f7444821444553fd6e1e700bb55348f24e8ec39afdd4e3d0312"},
{file = "ruff-0.0.281.tar.gz", hash = "sha256:bab2cdfa78754315cccc2b4d46ad6181aabb29e89747a3b135a4b85e11baa025"},
]
[[package]] [[package]]
name = "urllib3" name = "urllib3"
version = "2.0.4" version = "2.0.4"
@ -158,7 +199,21 @@ secure = ["certifi", "cryptography (>=1.9)", "idna (>=2.0.0)", "pyopenssl (>=17.
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
zstd = ["zstandard (>=0.18.0)"] zstd = ["zstandard (>=0.18.0)"]
[[package]]
name = "win32-setctime"
version = "1.1.0"
description = "A small Python utility to set file creation time on Windows"
optional = false
python-versions = ">=3.5"
files = [
{file = "win32_setctime-1.1.0-py3-none-any.whl", hash = "sha256:231db239e959c2fe7eb1d7dc129f11172354f98361c4fa2d6d2d7e278baa8aad"},
{file = "win32_setctime-1.1.0.tar.gz", hash = "sha256:15cf5750465118d6929ae4de4eb46e8edae9a5634350c01ba582df868e932cb2"},
]
[package.extras]
dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"]
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.10" python-versions = "^3.10"
content-hash = "b5a2a9bb04c1aabc3900490b31fc56bddc8d7ca4be53a6b2dd50a3d8ee90c6de" content-hash = "2213d241196503176caefedd103dfc06ddd3a12c386e2948706dc0d732d7ed50"

View File

@ -10,9 +10,12 @@ packages = [{include = "cloudflare_gateway_adblocking"}]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.10" python = "^3.10"
requests = "^2.31.0" requests = "^2.31.0"
python-dotenv = "^1.0.0" loguru = "^0.7.0"
[tool.poetry.group.dev.dependencies]
ruff = "^0.0.281"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"

39
src/cmd.py Normal file
View File

@ -0,0 +1,39 @@
import utils
import argparse
import os
from loguru import logger
def cli():
# Set up argparse
argparser = argparse.ArgumentParser()
# Add arguments
argparser.add_argument(
"--account-id",
"-a",
help="Cloudflare account ID - environment variable: CLOUDFLARE_ACCOUNT_ID",
default=os.environ.get("CLOUDFLARE_ACCOUNT_ID")
)
argparser.add_argument('--token',
'-t',
help='Cloudflare API token - environment variable: CLOUDFLARE_TOKEN',
default=os.environ.get("CLOUDFLARE_TOKEN")
)
# Parse arguments
args = argparser.parse_args()
logger.debug(args)
# Load variables
TOKEN = args.token
ACCOUNT_ID = args.account_id
# Check if variables are set
if TOKEN is None or ACCOUNT_ID is None:
logger.error("No environment variables found. Please create a .env file or .envrc file") # noqa E501
exit()
if __name__ == "__main__":
cli()

View File

@ -1,12 +1,7 @@
import os
import requests import requests
import utils.utils import utils
import pathlib import pathlib
# Load environment variables
TOKEN = utils.load_env()["CLOUDFLARE_TOKEN"]
ACCOUNT_ID = utils.load_env()["CLOUDFLARE_ACCOUNT_ID"]
def get_blocklists(): def get_blocklists():
# __file__ is a special variable that is the path to the current file # __file__ is a special variable that is the path to the current file
@ -35,15 +30,15 @@ def split_list(blocklists):
return lists return lists
def upload_to_cloudflare(lists): def upload_to_cloudflare(lists, account_id: str, token: str) -> None:
# A: It's iterating over the lists and uploading them to Cloudflare, the enumerate function is used to get the index of the list since lists is a list of lists # A: It's iterating over the lists and uploading them to Cloudflare, the enumerate function is used to get the index of the list since lists is a list of lists
for i, lst in enumerate(lists): for i, lst in enumerate(lists):
list_name = f"adblock-list-{i + 1}" list_name = f"adblock-list-{i + 1}"
url = ( url = (
f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists" f"https://api.cloudflare.com/client/v4/accounts/{account_id}/gateway/lists"
) )
headers = { headers = {
"Authorization": f"Bearer {TOKEN}", "Authorization": f"Bearer {token}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
@ -65,10 +60,10 @@ def upload_to_cloudflare(lists):
print(f"Error uploading {list_name}: {response.text}") print(f"Error uploading {list_name}: {response.text}")
def create_dns_policy(lists): def create_dns_policy(lists, account_id: str, token: str) -> None:
url = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/rules" url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/gateway/rules"
headers = { headers = {
"Authorization": f"Bearer {TOKEN}", "Authorization": f"Bearer {token}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
# Construct the traffic string # Construct the traffic string

View File

@ -1,18 +1,14 @@
# This is a scriprt to undo the changes made by adblock-zerotrust.py # This is a scriprt to undo the changes made by adblock-zerotrust.py
import requests import requests
import utils.utils import utils
# Load environment variables
TOKEN = utils.load_env()["CLOUDFLARE_TOKEN"]
ACCOUNT_ID = utils.load_env()["CLOUDFLARE_ACCOUNT_ID"]
def delete_adblock_list(lists: dict): def delete_adblock_list(lists: dict, account_id: str, token: str):
for lst in lists: for lst in lists:
url = f'https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists/{lst["id"]}' url = f'https://api.cloudflare.com/client/v4/accounts/{account_id}/gateway/lists/{lst["id"]}'
headers = { headers = {
"Authorization": f"Bearer {TOKEN}", "Authorization": f"Bearer {token}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
response = requests.delete(url, headers=headers, timeout=10) response = requests.delete(url, headers=headers, timeout=10)
@ -22,12 +18,12 @@ def delete_adblock_list(lists: dict):
print(f'Deleted list {lst["name"]}') print(f'Deleted list {lst["name"]}')
def delete_adblock_policy(policies: dict): def delete_adblock_policy(policies: dict, account_id: str, token: str):
for policy in policies: for policy in policies:
if policy["name"] == "Block Ads": if policy["name"] == "Block Ads":
url = f'https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/rules/{policy["id"]}' url = f'https://api.cloudflare.com/client/v4/accounts/{account_id}/gateway/rules/{policy["id"]}'
headers = { headers = {
"Authorization": f"Bearer {TOKEN}", "Authorization": f"Bearer {token}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
response = requests.delete(url, headers=headers, timeout=10) response = requests.delete(url, headers=headers, timeout=10)

View File

@ -1,15 +1,63 @@
import pathlib
import os import os
import pathlib
import requests import requests
from dotenv import load_dotenv from dotenv import load_dotenv
class Config:
def __init__(self, token, account_id):
# Set token using setter
self.token = token
if account_id is None:
raise ValueError("No account ID provided")
@property
def token(self):
return self._token
@token.setter
def token(self, token):
if token is None:
raise ValueError("No token provided")
else:
url = 'https://api.cloudflare.com/client/v4/user/tokens/verify'
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
response = requests.get(url, headers=headers, timeout=10)
if response.json()['success'] is False:
raise ValueError('Invalid token')
else:
# Token needs the following scopes:
# Zero Trust: Read/Edit
# Account Firewall Access Rules: Read/Edit
# Access Apps and Policies: Read/Edit
self._token = token
@property
def account_id(self):
return self._account_id
@account_id.setter
def account_id(self, account_id):
if account_id is None:
raise ValueError("No account ID provided")
else:
# Possibly make a request to lists to check if the account ID exists
self._account_id = account_id
# List Utils # List Utils
# Convert a hosts file to a simple hostname list # Convert a hosts file to a simple hostname list
def convert_to_list(file: pathlib.Path) -> list: def convert_to_list(file: pathlib.Path) -> list:
with open(file, "r") as f: with open(file, "r") as f:
# Don't read commented lines; strip whitespace; remove 127.0.0.1 from beginning of line; ignore lines with "localhost"; ignore lines with "::1"; ignore newlines blocklists.extend([i[10:].strip() for i in f.readlines() if not i.startswith('#') and 'localhost' not in i and '::1' not in i]) # Don't read commented lines; strip whitespace;
# remove 127.0.0.1 from beginning of line;
# ignore lines with "localhost"; ignore lines with "::1";
# ignore newlines
hosts = [ hosts = [
i[10:].strip() i[10:].strip()
for i in f.readlines() for i in f.readlines()
@ -19,7 +67,9 @@ def convert_to_list(file: pathlib.Path) -> list:
# for x in f.readlines(): # for x in f.readlines():
# if not x.startswith('#') and 'localhost' not in x and '::1' not in x: # if not x.startswith('#') and 'localhost' not in x and '::1' not in x:
# hosts.append(x[10:].strip()) # hosts.append(x[10:].strip())
# If there are any empty strings in the list, remove them since for some reason, whitespace is stil in the list
# If there are any empty strings in the list, remove them
# For some reason, whitelist seems to still be present
hosts = [i for i in hosts if i != ""] hosts = [i for i in hosts if i != ""]
return hosts return hosts
@ -28,30 +78,30 @@ def convert_to_list(file: pathlib.Path) -> list:
# Load environment variables # Load environment variables
def load_env() -> dict: # def load_env() -> dict:
load_dotenv(".env") # load_dotenv(".env")
if not os.environ.get("CLOUDFLARE_TOKEN") and not os.environ.get( # if not os.environ.get("CLOUDFLARE_TOKEN") and not os.environ.get(
"CLOUDFLARE_ACCOUNT_ID" # "CLOUDFLARE_ACCOUNT_ID"
): # ):
load_dotenv(".envrc") # load_dotenv(".envrc")
if not os.environ.get("CLOUDFLARE_TOKEN") or not os.environ.get( # if not os.environ.get("CLOUDFLARE_TOKEN") or not os.environ.get(
"CLOUDFLARE_ACCOUNT_ID" # "CLOUDFLARE_ACCOUNT_ID"
): # ):
print( # print(
"No environment variables found. Please create a .env file or .envrc file" # "No environment variables found. Please create a .env file or .envrc file"
) # )
exit() # exit()
else: # else:
return { # return {
"CLOUDFLARE_TOKEN": os.environ.get("CLOUDFLARE_TOKEN"), # "CLOUDFLARE_TOKEN": os.environ.get("CLOUDFLARE_TOKEN"),
"CLOUDFLARE_ACCOUNT_ID": os.environ.get("CLOUDFLARE_ACCOUNT_ID"), # "CLOUDFLARE_ACCOUNT_ID": os.environ.get("CLOUDFLARE_ACCOUNT_ID"),
} # }
def get_lists() -> dict: def get_lists(account_id, token) -> dict:
url = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists" url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/gateway/lists"
headers = { headers = {
"Authorization": f"Bearer {TOKEN}", "Authorization": f"Bearer {token}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
response = requests.get(url, headers=headers, timeout=10) response = requests.get(url, headers=headers, timeout=10)
@ -68,17 +118,13 @@ def filter_adblock_lists(lists: dict) -> dict:
return adblock_lists return adblock_lists
def get_gateway_rules() -> dict: def get_gateway_rules(account_id, token) -> dict:
url = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/rules" url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/gateway/rules"
headers = { headers = {
"Authorization": f"Bearer {TOKEN}", "Authorization": f"Bearer {token}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
response = requests.get(url, headers=headers, timeout=10) response = requests.get(url, headers=headers, timeout=10)
if response.status_code != 200: if response.status_code != 200:
print(f"Error getting lists: {response.text}") print(f"Error getting lists: {response.text}")
return response.json()["result"] return response.json()["result"]
TOKEN = load_env()["CLOUDFLARE_TOKEN"]
ACCOUNT_ID = load_env()["CLOUDFLARE_ACCOUNT_ID"]