Compare commits

...

2 Commits

Author SHA1 Message Date
slashtechno df2a598c9e
Remove unused imports 2023-08-01 12:20:18 -05:00
slashtechno 32f1cf588e
Allow directory to be passed as `--whitelist`
Improve error handling
2023-08-01 12:16:31 -05:00
8 changed files with 221 additions and 93 deletions

1
.gitignore vendored
View File

@ -7,3 +7,4 @@ blocklists/*
tmp.py tmp.py
.venv .venv
dist/ dist/
.ruff_cache/

120
poetry.lock generated
View File

@ -1,5 +1,50 @@
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. # This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
[[package]]
name = "black"
version = "23.7.0"
description = "The uncompromising code formatter."
optional = false
python-versions = ">=3.8"
files = [
{file = "black-23.7.0-cp310-cp310-macosx_10_16_arm64.whl", hash = "sha256:5c4bc552ab52f6c1c506ccae05681fab58c3f72d59ae6e6639e8885e94fe2587"},
{file = "black-23.7.0-cp310-cp310-macosx_10_16_universal2.whl", hash = "sha256:552513d5cd5694590d7ef6f46e1767a4df9af168d449ff767b13b084c020e63f"},
{file = "black-23.7.0-cp310-cp310-macosx_10_16_x86_64.whl", hash = "sha256:86cee259349b4448adb4ef9b204bb4467aae74a386bce85d56ba4f5dc0da27be"},
{file = "black-23.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:501387a9edcb75d7ae8a4412bb8749900386eaef258f1aefab18adddea1936bc"},
{file = "black-23.7.0-cp310-cp310-win_amd64.whl", hash = "sha256:fb074d8b213749fa1d077d630db0d5f8cc3b2ae63587ad4116e8a436e9bbe995"},
{file = "black-23.7.0-cp311-cp311-macosx_10_16_arm64.whl", hash = "sha256:b5b0ee6d96b345a8b420100b7d71ebfdd19fab5e8301aff48ec270042cd40ac2"},
{file = "black-23.7.0-cp311-cp311-macosx_10_16_universal2.whl", hash = "sha256:893695a76b140881531062d48476ebe4a48f5d1e9388177e175d76234ca247cd"},
{file = "black-23.7.0-cp311-cp311-macosx_10_16_x86_64.whl", hash = "sha256:c333286dc3ddca6fdff74670b911cccedacb4ef0a60b34e491b8a67c833b343a"},
{file = "black-23.7.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:831d8f54c3a8c8cf55f64d0422ee875eecac26f5f649fb6c1df65316b67c8926"},
{file = "black-23.7.0-cp311-cp311-win_amd64.whl", hash = "sha256:7f3bf2dec7d541b4619b8ce526bda74a6b0bffc480a163fed32eb8b3c9aed8ad"},
{file = "black-23.7.0-cp38-cp38-macosx_10_16_arm64.whl", hash = "sha256:f9062af71c59c004cd519e2fb8f5d25d39e46d3af011b41ab43b9c74e27e236f"},
{file = "black-23.7.0-cp38-cp38-macosx_10_16_universal2.whl", hash = "sha256:01ede61aac8c154b55f35301fac3e730baf0c9cf8120f65a9cd61a81cfb4a0c3"},
{file = "black-23.7.0-cp38-cp38-macosx_10_16_x86_64.whl", hash = "sha256:327a8c2550ddc573b51e2c352adb88143464bb9d92c10416feb86b0f5aee5ff6"},
{file = "black-23.7.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6d1c6022b86f83b632d06f2b02774134def5d4d4f1dac8bef16d90cda18ba28a"},
{file = "black-23.7.0-cp38-cp38-win_amd64.whl", hash = "sha256:27eb7a0c71604d5de083757fbdb245b1a4fae60e9596514c6ec497eb63f95320"},
{file = "black-23.7.0-cp39-cp39-macosx_10_16_arm64.whl", hash = "sha256:8417dbd2f57b5701492cd46edcecc4f9208dc75529bcf76c514864e48da867d9"},
{file = "black-23.7.0-cp39-cp39-macosx_10_16_universal2.whl", hash = "sha256:47e56d83aad53ca140da0af87678fb38e44fd6bc0af71eebab2d1f59b1acf1d3"},
{file = "black-23.7.0-cp39-cp39-macosx_10_16_x86_64.whl", hash = "sha256:25cc308838fe71f7065df53aedd20327969d05671bac95b38fdf37ebe70ac087"},
{file = "black-23.7.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:642496b675095d423f9b8448243336f8ec71c9d4d57ec17bf795b67f08132a91"},
{file = "black-23.7.0-cp39-cp39-win_amd64.whl", hash = "sha256:ad0014efc7acf0bd745792bd0d8857413652979200ab924fbf239062adc12491"},
{file = "black-23.7.0-py3-none-any.whl", hash = "sha256:9fd59d418c60c0348505f2ddf9609c1e1de8e7493eab96198fc89d9f865e7a96"},
{file = "black-23.7.0.tar.gz", hash = "sha256:022a582720b0d9480ed82576c920a8c1dde97cc38ff11d8d8859b3bd6ca9eedb"},
]
[package.dependencies]
click = ">=8.0.0"
mypy-extensions = ">=0.4.3"
packaging = ">=22.0"
pathspec = ">=0.9.0"
platformdirs = ">=2"
tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""}
[package.extras]
colorama = ["colorama (>=0.4.3)"]
d = ["aiohttp (>=3.7.4)"]
jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"]
uvloop = ["uvloop (>=0.15.2)"]
[[package]] [[package]]
name = "certifi" name = "certifi"
version = "2023.7.22" version = "2023.7.22"
@ -95,6 +140,20 @@ files = [
{file = "charset_normalizer-3.2.0-py3-none-any.whl", hash = "sha256:8e098148dd37b4ce3baca71fb394c81dc5d9c7728c95df695d2dca218edf40e6"}, {file = "charset_normalizer-3.2.0-py3-none-any.whl", hash = "sha256:8e098148dd37b4ce3baca71fb394c81dc5d9c7728c95df695d2dca218edf40e6"},
] ]
[[package]]
name = "click"
version = "8.1.6"
description = "Composable command line interface toolkit"
optional = false
python-versions = ">=3.7"
files = [
{file = "click-8.1.6-py3-none-any.whl", hash = "sha256:fa244bb30b3b5ee2cae3da8f55c9e5e0c0e86093306301fb418eb9dc40fbded5"},
{file = "click-8.1.6.tar.gz", hash = "sha256:48ee849951919527a045bfe3bf7baa8a959c423134e1a5b98c05c20ba75a1cbd"},
]
[package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]] [[package]]
name = "colorama" name = "colorama"
version = "0.4.6" version = "0.4.6"
@ -135,6 +194,54 @@ win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""}
[package.extras] [package.extras]
dev = ["Sphinx (==5.3.0)", "colorama (==0.4.5)", "colorama (==0.4.6)", "freezegun (==1.1.0)", "freezegun (==1.2.2)", "mypy (==v0.910)", "mypy (==v0.971)", "mypy (==v0.990)", "pre-commit (==3.2.1)", "pytest (==6.1.2)", "pytest (==7.2.1)", "pytest-cov (==2.12.1)", "pytest-cov (==4.0.0)", "pytest-mypy-plugins (==1.10.1)", "pytest-mypy-plugins (==1.9.3)", "sphinx-autobuild (==2021.3.14)", "sphinx-rtd-theme (==1.2.0)", "tox (==3.27.1)", "tox (==4.4.6)"] dev = ["Sphinx (==5.3.0)", "colorama (==0.4.5)", "colorama (==0.4.6)", "freezegun (==1.1.0)", "freezegun (==1.2.2)", "mypy (==v0.910)", "mypy (==v0.971)", "mypy (==v0.990)", "pre-commit (==3.2.1)", "pytest (==6.1.2)", "pytest (==7.2.1)", "pytest-cov (==2.12.1)", "pytest-cov (==4.0.0)", "pytest-mypy-plugins (==1.10.1)", "pytest-mypy-plugins (==1.9.3)", "sphinx-autobuild (==2021.3.14)", "sphinx-rtd-theme (==1.2.0)", "tox (==3.27.1)", "tox (==4.4.6)"]
[[package]]
name = "mypy-extensions"
version = "1.0.0"
description = "Type system extensions for programs checked with the mypy type checker."
optional = false
python-versions = ">=3.5"
files = [
{file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"},
{file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"},
]
[[package]]
name = "packaging"
version = "23.1"
description = "Core utilities for Python packages"
optional = false
python-versions = ">=3.7"
files = [
{file = "packaging-23.1-py3-none-any.whl", hash = "sha256:994793af429502c4ea2ebf6bf664629d07c1a9fe974af92966e4b8d2df7edc61"},
{file = "packaging-23.1.tar.gz", hash = "sha256:a392980d2b6cffa644431898be54b0045151319d1e7ec34f0cfed48767dd334f"},
]
[[package]]
name = "pathspec"
version = "0.11.2"
description = "Utility library for gitignore style pattern matching of file paths."
optional = false
python-versions = ">=3.7"
files = [
{file = "pathspec-0.11.2-py3-none-any.whl", hash = "sha256:1d6ed233af05e679efb96b1851550ea95bbb64b7c490b0f5aa52996c11e92a20"},
{file = "pathspec-0.11.2.tar.gz", hash = "sha256:e0d8d0ac2f12da61956eb2306b69f9469b42f4deb0f3cb6ed47b9cce9996ced3"},
]
[[package]]
name = "platformdirs"
version = "3.10.0"
description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
optional = false
python-versions = ">=3.7"
files = [
{file = "platformdirs-3.10.0-py3-none-any.whl", hash = "sha256:d7c24979f292f916dc9cbf8648319032f551ea8c49a4c9bf2fb556a02070ec1d"},
{file = "platformdirs-3.10.0.tar.gz", hash = "sha256:b45696dab2d7cc691a3226759c0d3b00c47c8b6e293d96f6436f733303f77f6d"},
]
[package.extras]
docs = ["furo (>=2023.7.26)", "proselint (>=0.13)", "sphinx (>=7.1.1)", "sphinx-autodoc-typehints (>=1.24)"]
test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.4)", "pytest-cov (>=4.1)", "pytest-mock (>=3.11.1)"]
[[package]] [[package]]
name = "python-dotenv" name = "python-dotenv"
version = "1.0.0" version = "1.0.0"
@ -196,6 +303,17 @@ files = [
{file = "ruff-0.0.281.tar.gz", hash = "sha256:bab2cdfa78754315cccc2b4d46ad6181aabb29e89747a3b135a4b85e11baa025"}, {file = "ruff-0.0.281.tar.gz", hash = "sha256:bab2cdfa78754315cccc2b4d46ad6181aabb29e89747a3b135a4b85e11baa025"},
] ]
[[package]]
name = "tomli"
version = "2.0.1"
description = "A lil' TOML parser"
optional = false
python-versions = ">=3.7"
files = [
{file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"},
{file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"},
]
[[package]] [[package]]
name = "urllib3" name = "urllib3"
version = "2.0.4" version = "2.0.4"
@ -230,4 +348,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"]
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.10" python-versions = "^3.10"
content-hash = "8737765a4f30f43f2314296861d56c04682f5632f54ecd7b2046b4d9591ed101" content-hash = "d1e33ec5def7b19f6fd09ac80725a9a9f24da3274b075ee37eb53fe2b2f54219"

View File

@ -5,6 +5,9 @@ description = "Serverless adblocking via Cloudflare Zero Trust Gateway"
authors = ["slastechno <77907286+slashtechno@users.noreply.github.com>"] authors = ["slastechno <77907286+slashtechno@users.noreply.github.com>"]
license = "MIT" license = "MIT"
readme = "README.md" readme = "README.md"
repository = "https://github.com/slashtechno/cloudflare-gateway-adblocking"
keywords = ["cloudflare", "dns", "adblocking", "serverless"]
packages = [{include = "src"}] packages = [{include = "src"}]
[tool.poetry.scripts] [tool.poetry.scripts]
@ -19,6 +22,7 @@ python-dotenv = "^1.0.0"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
ruff = "^0.0.281" ruff = "^0.0.281"
black = "^23.7.0"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]

View File

@ -12,16 +12,16 @@ from pathlib import Path
TOKEN = None TOKEN = None
ACCOUNT_ID = None ACCOUNT_ID = None
def main(): def main():
# Setup logging # Setup logging
logger.remove() logger.remove()
# ^10 is a formatting directive to center with a padding of 10 # ^10 is a formatting directive to center with a padding of 10
logger_format = "<green>{time:YYYY-MM-DD HH:mm:ss}</green> |<level>{level: ^10}</level>| <level>{message}</level>" # pylint: disable=line-too-long logger_format = "<green>{time:YYYY-MM-DD HH:mm:ss}</green> |<level>{level: ^10}</level>| <level>{message}</level>" # noqa E501
logger.add(stderr, format=logger_format, colorize=True, level="DEBUG") logger.add(stderr, format=logger_format, colorize=True, level="DEBUG")
# Load .env if it exists # Load .env if it exists
# This must precede the argparse setup as os.environ is used in the default arguments # This must precede the argparse setup as env variables are used as default values
if Path(".env").is_file(): if Path(".env").is_file():
dotenv.load_dotenv() dotenv.load_dotenv()
logger.info("Loaded .env file") logger.info("Loaded .env file")
@ -32,25 +32,26 @@ def main():
# Set up argparse # Set up argparse
argparser = argparse.ArgumentParser( argparser = argparse.ArgumentParser(
prog='Cloudflare Gateway Adblocking', prog="Cloudflare Gateway Adblocking",
description='Serverless adblocking via Cloudflare Zero Trust Gateway', description="Serverless adblocking via Cloudflare Zero Trust Gateway",
epilog=':)' epilog=":)",
) )
# Add argument groups # Add argument groups
credential_args = argparser.add_argument_group('Cloudflare Credentials') credential_args = argparser.add_argument_group("Cloudflare Credentials")
# Add arguments # Add arguments
credential_args.add_argument( credential_args.add_argument(
"--account-id", "--account-id",
"-a", "-a",
help="Cloudflare account ID - environment variable: CLOUDFLARE_ACCOUNT_ID", help="Cloudflare account ID - environment variable: CLOUDFLARE_ACCOUNT_ID",
default=os.environ.get("CLOUDFLARE_ACCOUNT_ID") default=os.environ.get("CLOUDFLARE_ACCOUNT_ID"),
) )
credential_args.add_argument('--token', credential_args.add_argument(
'-t', "--token",
help='Cloudflare API token - environment variable: CLOUDFLARE_TOKEN', "-t",
default=os.environ.get("CLOUDFLARE_TOKEN") help="Cloudflare API token - environment variable: CLOUDFLARE_TOKEN",
default=os.environ.get("CLOUDFLARE_TOKEN"),
) )
# Add subcommands # Add subcommands
@ -58,31 +59,28 @@ def main():
title="subcommands", title="subcommands",
description="", description="",
help="Subcommands to preform operations", help="Subcommands to preform operations",
dest="subcommand" dest="subcommand",
) )
# Add subcommand: upload # Add subcommand: upload
upload_parser = subparsers.add_parser( upload_parser = subparsers.add_parser(
"upload", "upload", help="Upload adblock lists to Cloudflare"
help="Upload adblock lists to Cloudflare"
) )
upload_parser.set_defaults(func=upload_to_cloudflare) upload_parser.set_defaults(func=upload_to_cloudflare)
upload_parser.add_argument( upload_parser.add_argument(
"--blocklists", "--blocklists",
"-b", "-b",
help="Either a blocklist hosts file or a directory containing blocklist hosts files", help="Either a blocklist hosts file or a directory containing blocklist hosts files", # noqa E501
default="blocklists" default="blocklists",
) )
upload_parser.add_argument( upload_parser.add_argument(
"--whitelists", "--whitelists",
"-w", "-w",
# help="Either a whitelist hosts file or a directory containing whitelist hosts files" help="Either a whitelist hosts file or a directory containing whitelist hosts files", # noqa E501
help="Whitelist hosts file or directory", default="whitelist.txt", # Need to change this so it's optional
default="whitelist.txt" # Need to change this so it's optional
) )
# Add subcommand: delete # Add subcommand: delete
delete_parser = subparsers.add_parser( delete_parser = subparsers.add_parser(
"delete", "delete", help="Delete adblock lists from Cloudflare"
help="Delete adblock lists from Cloudflare"
) )
delete_parser.set_defaults(func=delete_from_cloudflare) delete_parser.set_defaults(func=delete_from_cloudflare)
@ -97,14 +95,17 @@ def main():
ACCOUNT_ID = args.account_id ACCOUNT_ID = args.account_id
# Check if variables are set # Check if variables are set
if TOKEN is None or ACCOUNT_ID is None: if TOKEN is None or ACCOUNT_ID is None:
logger.error("No environment variables found. Please create a .env file or .envrc file") # noqa E501 logger.error(
"No environment variables found. Please create a .env file or .envrc file"
) # noqa E501
exit(1) exit(1)
try : try:
args.func(args) args.func(args)
except AttributeError as e: except AttributeError as e:
logger.debug(e) logger.debug(e)
argparser.print_help() argparser.print_help()
def upload_to_cloudflare(args): def upload_to_cloudflare(args):
logger.info("Uploading to Cloudflare") logger.info("Uploading to Cloudflare")
blocklists = upload.get_blocklists(args.blocklists) blocklists = upload.get_blocklists(args.blocklists)
@ -114,6 +115,8 @@ def upload_to_cloudflare(args):
cloud_lists = utils.get_lists(ACCOUNT_ID, TOKEN) cloud_lists = utils.get_lists(ACCOUNT_ID, TOKEN)
cloud_lists = utils.filter_adblock_lists(cloud_lists) cloud_lists = utils.filter_adblock_lists(cloud_lists)
upload.create_dns_policy(cloud_lists, ACCOUNT_ID, TOKEN) upload.create_dns_policy(cloud_lists, ACCOUNT_ID, TOKEN)
def delete_from_cloudflare(args): def delete_from_cloudflare(args):
logger.info("Deleting from Cloudflare") logger.info("Deleting from Cloudflare")
rules = utils.get_gateway_rules(ACCOUNT_ID, TOKEN) rules = utils.get_gateway_rules(ACCOUNT_ID, TOKEN)
@ -122,5 +125,6 @@ def delete_from_cloudflare(args):
lists = utils.filter_adblock_lists(lists) lists = utils.filter_adblock_lists(lists)
delete.delete_adblock_list(lists, ACCOUNT_ID, TOKEN) delete.delete_adblock_list(lists, ACCOUNT_ID, TOKEN)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -5,17 +5,23 @@ from . import utils
def delete_adblock_list(lists: dict, account_id: str, token: str): def delete_adblock_list(lists: dict, account_id: str, token: str):
for lst in lists: try:
url = f'https://api.cloudflare.com/client/v4/accounts/{account_id}/gateway/lists/{lst["id"]}' for lst in lists:
headers = { url = f'https://api.cloudflare.com/client/v4/accounts/{account_id}/gateway/lists/{lst["id"]}'
"Authorization": f"Bearer {token}", headers = {
"Content-Type": "application/json", "Authorization": f"Bearer {token}",
} "Content-Type": "application/json",
response = requests.delete(url, headers=headers, timeout=10) }
if response.status_code != 200: response = requests.delete(url, headers=headers, timeout=10)
print(f"Error deleting list: {response.text}") if response.status_code != 200:
print(f"Error deleting list: {response.text}")
else:
print(f'Deleted list {lst["name"]}')
except TypeError as e:
if str(e) == "'NoneType' object is not iterable":
print("No lists found")
else: else:
print(f'Deleted list {lst["name"]}') raise e
def delete_adblock_policy(policies: dict, account_id: str, token: str): def delete_adblock_policy(policies: dict, account_id: str, token: str):

View File

@ -3,21 +3,32 @@ from . import utils
import pathlib import pathlib
def get_blocklists(path_to_blocklists: str = None): def get_blocklists(hosts_path: str = None):
# __file__ is a special variable that is the path to the current file blocklists = []
# list_directory = pathlib.Path(__file__).parent.parent.joinpath("blocklists") hosts_path = pathlib.Path(hosts_path)
if hosts_path.is_file():
list_directory = pathlib.Path(path_to_blocklists) blocklists = utils.convert_to_list(hosts_path)
for file in list_directory.iterdir(): elif hosts_path.is_dir():
blocklists = utils.convert_to_list(file) for file in hosts_path.iterdir():
blocklists.extend(utils.convert_to_list(file))
else:
raise ValueError("Invalid hosts file or directory")
return blocklists return blocklists
def apply_whitelists(blocklists, whitelist: str = None): def apply_whitelists(blocklists, whitelist: str = None):
whitelist = utils.convert_to_list( # If whitelist is a file, convert it to a list.
# pathlib.Path(__file__).parent.parent.joinpath("whitelist.txt") # If whitelist is a directory, convert all files in it to a list and combine them.
pathlib.Path(whitelist) # If it does not exist, return the original blocklists
) whitelist_path = pathlib.Path(whitelist)
if whitelist_path.is_file():
whitelist = utils.convert_to_list(whitelist_path)
elif whitelist_path.is_dir():
whitelist = []
for file in whitelist_path.iterdir():
whitelist.extend(utils.convert_to_list(file))
else:
return blocklists
blocklists = [x for x in blocklists if x not in whitelist] blocklists = [x for x in blocklists if x not in whitelist]
return blocklists return blocklists
@ -28,13 +39,12 @@ def split_list(blocklists):
[blocklists[i : i + 1000] for i in range(0, len(blocklists), 1000)] [blocklists[i : i + 1000] for i in range(0, len(blocklists), 1000)]
) # This is the same as the for loop below ) # This is the same as the for loop below
# for i in range(0, len(blocklists), 1000): # for i in range(0, len(blocklists), 1000):
# # This is appending a list of 1000 domains to the lists list. It is doing this by slicing the blocklists list to get the first 1000 domains, then the next 1000 domains, etc. # This continues to append lists of 1000 items to the lists list via slicing
# lists.append(blocklists[i:i + 1000]) # lists.append(blocklists[i:i + 1000])
return lists return lists
def upload_to_cloudflare(lists, account_id: str, token: str) -> None: def upload_to_cloudflare(lists, account_id: str, token: str) -> None:
# A: It's iterating over the lists and uploading them to Cloudflare, the enumerate function is used to get the index of the list since lists is a list of lists
for i, lst in enumerate(lists): for i, lst in enumerate(lists):
list_name = f"adblock-list-{i + 1}" list_name = f"adblock-list-{i + 1}"
url = ( url = (
@ -98,5 +108,6 @@ def main():
cloud_lists = utils.filter_adblock_lists(cloud_lists) cloud_lists = utils.filter_adblock_lists(cloud_lists)
create_dns_policy(cloud_lists) create_dns_policy(cloud_lists)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,8 +1,7 @@
import os
import pathlib import pathlib
import requests import requests
from dotenv import load_dotenv
class Config: class Config:
def __init__(self, token, account_id): def __init__(self, token, account_id):
@ -14,28 +13,31 @@ class Config:
@property @property
def token(self): def token(self):
return self._token return self._token
@token.setter @token.setter
def token(self, token): def token(self, token):
if token is None: if token is None:
raise ValueError("No token provided") raise ValueError("No token provided")
else: else:
url = 'https://api.cloudflare.com/client/v4/user/tokens/verify' url = "https://api.cloudflare.com/client/v4/user/tokens/verify"
headers = { headers = {
'Authorization': f'Bearer {token}', "Authorization": f"Bearer {token}",
'Content-Type': 'application/json' "Content-Type": "application/json",
} }
response = requests.get(url, headers=headers, timeout=10) response = requests.get(url, headers=headers, timeout=10)
if response.json()['success'] is False: if response.json()["success"] is False:
raise ValueError('Invalid token') raise ValueError("Invalid token")
else: else:
# Token needs the following scopes: # Token needs the following scopes:
# Zero Trust: Read/Edit # Zero Trust: Read/Edit
# Account Firewall Access Rules: Read/Edit # Account Firewall Access Rules: Read/Edit
# Access Apps and Policies: Read/Edit # Access Apps and Policies: Read/Edit
self._token = token self._token = token
@property @property
def account_id(self): def account_id(self):
return self._account_id return self._account_id
@account_id.setter @account_id.setter
def account_id(self, account_id): def account_id(self, account_id):
if account_id is None: if account_id is None:
@ -45,9 +47,6 @@ class Config:
self._account_id = account_id self._account_id = account_id
# List Utils # List Utils
@ -77,27 +76,6 @@ def convert_to_list(file: pathlib.Path) -> list:
# General Utils # General Utils
# Load environment variables
# def load_env() -> dict:
# load_dotenv(".env")
# if not os.environ.get("CLOUDFLARE_TOKEN") and not os.environ.get(
# "CLOUDFLARE_ACCOUNT_ID"
# ):
# load_dotenv(".envrc")
# if not os.environ.get("CLOUDFLARE_TOKEN") or not os.environ.get(
# "CLOUDFLARE_ACCOUNT_ID"
# ):
# print(
# "No environment variables found. Please create a .env file or .envrc file"
# )
# exit()
# else:
# return {
# "CLOUDFLARE_TOKEN": os.environ.get("CLOUDFLARE_TOKEN"),
# "CLOUDFLARE_ACCOUNT_ID": os.environ.get("CLOUDFLARE_ACCOUNT_ID"),
# }
def get_lists(account_id, token) -> dict: def get_lists(account_id, token) -> dict:
url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/gateway/lists" url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/gateway/lists"
headers = { headers = {
@ -112,9 +90,15 @@ def get_lists(account_id, token) -> dict:
def filter_adblock_lists(lists: dict) -> dict: def filter_adblock_lists(lists: dict) -> dict:
adblock_lists = [] adblock_lists = []
for lst in lists: try:
if lst["name"].startswith("adblock-list") and lst["type"] == "DOMAIN": for lst in lists:
adblock_lists.append(lst) if lst["name"].startswith("adblock-list") and lst["type"] == "DOMAIN":
adblock_lists.append(lst)
except TypeError as e:
if str(e) == "'NoneType' object is not iterable":
print("No lists found")
else:
raise e
return adblock_lists return adblock_lists

View File