From 0ce7446c375af76eb4aaa47fd5d31bd2599f6ae2 Mon Sep 17 00:00:00 2001 From: slashtechno <77907286+slashtechno@users.noreply.github.com> Date: Sun, 26 Mar 2023 22:16:26 +0000 Subject: [PATCH] Added deletion of DNS Policy --- .gitignore | 3 +- LICENSE | 7 ++ whitelist.txt | 0 ...lock-zerotrust.py => adblock_zerotrust.py} | 100 +++++++++++------- zerotrust_adblock/delete_adblock_zerotrust.py | 38 +++++-- zerotrust_adblock/utils.py | 67 ++++++++---- 6 files changed, 147 insertions(+), 68 deletions(-) create mode 100644 LICENSE create mode 100644 whitelist.txt rename zerotrust_adblock/{adblock-zerotrust.py => adblock_zerotrust.py} (51%) diff --git a/.gitignore b/.gitignore index b55843a..b7461af 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .envrc __pycache__/ -tmp.txt \ No newline at end of file +tmp.txt +.env \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..47d35f1 --- /dev/null +++ b/LICENSE @@ -0,0 +1,7 @@ +Copyright © 2023 slashtechno + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/whitelist.txt b/whitelist.txt new file mode 100644 index 0000000..e69de29 diff --git a/zerotrust_adblock/adblock-zerotrust.py b/zerotrust_adblock/adblock_zerotrust.py similarity index 51% rename from zerotrust_adblock/adblock-zerotrust.py rename to zerotrust_adblock/adblock_zerotrust.py index d03558d..a54dbe0 100644 --- a/zerotrust_adblock/adblock-zerotrust.py +++ b/zerotrust_adblock/adblock_zerotrust.py @@ -4,77 +4,94 @@ import utils import pathlib # Load environment variables -TOKEN = utils.load_env()['CLOUDFLARE_TOKEN'] -ACCOUNT_ID = utils.load_env()['CLOUDFLARE_ACCOUNT_ID'] +TOKEN = utils.load_env()["CLOUDFLARE_TOKEN"] +ACCOUNT_ID = utils.load_env()["CLOUDFLARE_ACCOUNT_ID"] + def get_blocklists(): # __file__ is a special variable that is the path to the current file - list_directory = pathlib.Path(__file__).parent.parent.joinpath('blocklists') + list_directory = pathlib.Path(__file__).parent.parent.joinpath("blocklists") for file in list_directory.iterdir(): blocklists = utils.convert_to_list(file) return blocklists + def apply_whitelists(blocklists): - whitelist = utils.convert_to_list(pathlib.Path(__file__).parent.parent.joinpath('whitelist.txt')) - blocklists = [x for x in blocklists if x not in whitelist] - return blocklists + whitelist = utils.convert_to_list( + pathlib.Path(__file__).parent.parent.joinpath("whitelist.txt") + ) + blocklists = [x for x in blocklists if x not in whitelist] + return blocklists + def split_list(blocklists): lists = [] - lists.extend([blocklists[i:i + 1000] for i in range(0, len(blocklists), 1000)]) # This is the same as the for loop below + lists.extend( + [blocklists[i : i + 1000] for i in range(0, len(blocklists), 1000)] + ) # This is the same as the for loop below # for i in range(0, len(blocklists), 1000): # # This is appending a list of 1000 domains to the lists list. It is doing this by slicing the blocklists list to get the first 1000 domains, then the next 1000 domains, etc. # lists.append(blocklists[i:i + 1000]) return lists + def upload_to_cloudflare(lists): # A: It's iterating over the lists and uploading them to Cloudflare, the enumerate function is used to get the index of the list since lists is a list of lists for i, lst in enumerate(lists): - list_name = f'adblock-list-{i + 1}' - url = f'https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists' + list_name = f"adblock-list-{i + 1}" + url = ( + f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists" + ) headers = { - 'Authorization': f'Bearer {TOKEN}', + "Authorization": f"Bearer {TOKEN}", + "Content-Type": "application/json", + } + + data = { + "name": list_name, + "type": "DOMAIN", + "description": "A blocklist of ad domains", + # Writing this program, I have noticed how powerful list comprehension is. + "items": [ + { + "value": x, + } + for x in lst + ], + } + response = requests.post(url, headers=headers, json=data, timeout=10) + print(f"Uploaded {list_name} to Cloudflare") + if response.status_code != 200: + print(f"Error uploading {list_name}: {response.text}") + + +def create_dns_policy(lists): + url = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/rules" + headers = { + "Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json", - } - # For debugging: - with open('lists.txt', 'w') as f: - f.write(str([{'value': x,} for x in lst[:10]])) - # exit() - data = { - 'name': list_name, - 'type': 'DOMAIN', - 'description': 'A blocklist of ad domains', - # Writing this program, I have noticed how powerful list comprehension is. - 'items': [{'value': x,} for x in lst]} - response = requests.post(url, headers=headers, json=data) - print(f'Uploaded {list_name} to Cloudflare') - if response.status_code != 200: - print(f'Error uploading {list_name}: {response.text}') -def create_dns_policy(lists): - url = f'https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/rules' - headers = { - 'Authorization': f'Bearer {TOKEN}', - 'Content-Type': 'application/json' } # Construct the traffic string - traffic = '' + traffic = "" for i, lst in enumerate(lists): if i != 0: # ' or ' cannot be seen in the Zero Trs Dashboard - traffic += ' or ' + traffic += " or " traffic += f'any(dns.domains[*] in ${lst["id"]})' - print(traffic) + # print(traffic) data = { - 'name': 'Block Ads', - 'description': 'Block ad domains', - 'action': 'block', - 'traffic': traffic, + "name": "Block Ads", + "description": "Block ad domains", + "action": "block", + "traffic": traffic, + "enabled": True, } - response = requests.post(url, headers=headers, json=data) + response = requests.post(url, headers=headers, json=data, timeout=10) if response.status_code != 200: - print(f'Error creating DNS policy: {response.text}') + print(f"Error creating DNS policy: {response.text}") -if __name__ == '__main__': + +def main(): blocklists = get_blocklists() blocklists = apply_whitelists(blocklists) lists = split_list(blocklists) @@ -83,3 +100,6 @@ if __name__ == '__main__': cloud_lists = utils.filter_adblock_lists(cloud_lists) create_dns_policy(cloud_lists) + +if __name__ == "__main__": + main() diff --git a/zerotrust_adblock/delete_adblock_zerotrust.py b/zerotrust_adblock/delete_adblock_zerotrust.py index 9ad02ec..96d4bbb 100644 --- a/zerotrust_adblock/delete_adblock_zerotrust.py +++ b/zerotrust_adblock/delete_adblock_zerotrust.py @@ -1,30 +1,52 @@ # This is a scriprt to undo the changes made by adblock-zerotrust.py import requests -import os import utils # Load environment variables -TOKEN = utils.load_env()['CLOUDFLARE_TOKEN'] -ACCOUNT_ID = utils.load_env()['CLOUDFLARE_ACCOUNT_ID'] +TOKEN = utils.load_env()["CLOUDFLARE_TOKEN"] +ACCOUNT_ID = utils.load_env()["CLOUDFLARE_ACCOUNT_ID"] def delete_adblock_list(lists: dict): for lst in lists: url = f'https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists/{lst["id"]}' headers = { - 'Authorization': f'Bearer {TOKEN}', + "Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json", } - response = requests.delete(url, headers=headers) + response = requests.delete(url, headers=headers, timeout=10) if response.status_code != 200: - print(f'Error deleting list: {response.text}') + print(f"Error deleting list: {response.text}") else: print(f'Deleted list {lst["name"]}') + + +def delete_adblock_policy(policies: dict): + for policy in policies: + if policy["name"] == "Block Ads": + url = f'https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/rules/{policy["id"]}' + headers = { + "Authorization": f"Bearer {TOKEN}", + "Content-Type": "application/json", + } + response = requests.delete(url, headers=headers, timeout=10) + if response.status_code != 200: + print(f"Error deleting policy: {response.text}") + else: + print(f'Deleted policy {policy["name"]}') + break + else: + print("No policy found") + + def main(): + rules = utils.get_gateway_rules() + delete_adblock_policy(rules) lists = utils.get_lists() lists = utils.filter_adblock_lists(lists) delete_adblock_list(lists) -if __name__ == '__main__': - main() \ No newline at end of file + +if __name__ == "__main__": + main() diff --git a/zerotrust_adblock/utils.py b/zerotrust_adblock/utils.py index c6647af..6bb4343 100644 --- a/zerotrust_adblock/utils.py +++ b/zerotrust_adblock/utils.py @@ -1,55 +1,84 @@ import pathlib -import os +import os import requests from dotenv import load_dotenv # List Utils + # Convert a hosts file to a simple hostname list -def convert_to_list(file:pathlib.Path) -> list: - with open(file, 'r') as f: +def convert_to_list(file: pathlib.Path) -> list: + with open(file, "r") as f: # Don't read commented lines; strip whitespace; remove 127.0.0.1 from beginning of line; ignore lines with "localhost"; ignore lines with "::1"; ignore newlines blocklists.extend([i[10:].strip() for i in f.readlines() if not i.startswith('#') and 'localhost' not in i and '::1' not in i]) - hosts = ([i[10:].strip() for i in f.readlines() if not i.startswith('#') and 'localhost' not in i and '::1' not in i]) + hosts = [ + i[10:].strip() + for i in f.readlines() + if not i.startswith("#") and "localhost" not in i and "::1" not in i + ] # Equivalent to: # for x in f.readlines(): # if not x.startswith('#') and 'localhost' not in x and '::1' not in x: # hosts.append(x[10:].strip()) # If there are any empty strings in the list, remove them since for some reason, whitespace is stil in the list - hosts = [i for i in hosts if i != ''] + hosts = [i for i in hosts if i != ""] return hosts + # General Utils + # Load environment variables def load_env() -> dict: load_dotenv(".env") - if not os.environ.get('CLOUDFLARE_TOKEN') and not os.environ.get('CLOUDFLARE_ACCOUNT_ID'): + if not os.environ.get("CLOUDFLARE_TOKEN") and not os.environ.get( + "CLOUDFLARE_ACCOUNT_ID" + ): load_dotenv(".envrc") - if not os.environ.get('CLOUDFLARE_TOKEN') or not os.environ.get('CLOUDFLARE_ACCOUNT_ID'): - print('No environment variables found. Please create a .env file or .envrc file') + if not os.environ.get("CLOUDFLARE_TOKEN") or not os.environ.get( + "CLOUDFLARE_ACCOUNT_ID" + ): + print( + "No environment variables found. Please create a .env file or .envrc file" + ) exit() - else: - return {"CLOUDFLARE_TOKEN": os.environ.get('CLOUDFLARE_TOKEN'), "CLOUDFLARE_ACCOUNT_ID": os.environ.get('CLOUDFLARE_ACCOUNT_ID')} - + else: + return { + "CLOUDFLARE_TOKEN": os.environ.get("CLOUDFLARE_TOKEN"), + "CLOUDFLARE_ACCOUNT_ID": os.environ.get("CLOUDFLARE_ACCOUNT_ID"), + } def get_lists() -> dict: - url = f'https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists' + url = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists" headers = { - 'Authorization': f'Bearer {TOKEN}', + "Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json", } - response = requests.get(url, headers=headers) + response = requests.get(url, headers=headers, timeout=10) if response.status_code != 200: - print(f'Error getting lists: {response.text}') - return response.json()['result'] + print(f"Error getting lists: {response.text}") + return response.json()["result"] + def filter_adblock_lists(lists: dict) -> dict: adblock_lists = [] for lst in lists: - if lst['name'].startswith('adblock-list') and lst['type'] == 'DOMAIN': + if lst["name"].startswith("adblock-list") and lst["type"] == "DOMAIN": adblock_lists.append(lst) return adblock_lists -TOKEN = load_env()['CLOUDFLARE_TOKEN'] -ACCOUNT_ID = load_env()['CLOUDFLARE_ACCOUNT_ID'] + +def get_gateway_rules() -> dict: + url = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/rules" + headers = { + "Authorization": f"Bearer {TOKEN}", + "Content-Type": "application/json", + } + response = requests.get(url, headers=headers, timeout=10) + if response.status_code != 200: + print(f"Error getting lists: {response.text}") + return response.json()["result"] + + +TOKEN = load_env()["CLOUDFLARE_TOKEN"] +ACCOUNT_ID = load_env()["CLOUDFLARE_ACCOUNT_ID"]