Added deletion of DNS Policy

This commit is contained in:
slashtechno 2023-03-26 22:16:26 +00:00 committed by GitHub
parent 2f64bbca27
commit 0ce7446c37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 147 additions and 68 deletions

3
.gitignore vendored
View File

@ -1,3 +1,4 @@
.envrc
__pycache__/
tmp.txt
tmp.txt
.env

7
LICENSE Normal file
View File

@ -0,0 +1,7 @@
Copyright © 2023 slashtechno
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

0
whitelist.txt Normal file
View File

View File

@ -4,77 +4,94 @@ import utils
import pathlib
# Load environment variables
TOKEN = utils.load_env()['CLOUDFLARE_TOKEN']
ACCOUNT_ID = utils.load_env()['CLOUDFLARE_ACCOUNT_ID']
TOKEN = utils.load_env()["CLOUDFLARE_TOKEN"]
ACCOUNT_ID = utils.load_env()["CLOUDFLARE_ACCOUNT_ID"]
def get_blocklists():
# __file__ is a special variable that is the path to the current file
list_directory = pathlib.Path(__file__).parent.parent.joinpath('blocklists')
list_directory = pathlib.Path(__file__).parent.parent.joinpath("blocklists")
for file in list_directory.iterdir():
blocklists = utils.convert_to_list(file)
return blocklists
def apply_whitelists(blocklists):
whitelist = utils.convert_to_list(pathlib.Path(__file__).parent.parent.joinpath('whitelist.txt'))
blocklists = [x for x in blocklists if x not in whitelist]
return blocklists
whitelist = utils.convert_to_list(
pathlib.Path(__file__).parent.parent.joinpath("whitelist.txt")
)
blocklists = [x for x in blocklists if x not in whitelist]
return blocklists
def split_list(blocklists):
lists = []
lists.extend([blocklists[i:i + 1000] for i in range(0, len(blocklists), 1000)]) # This is the same as the for loop below
lists.extend(
[blocklists[i : i + 1000] for i in range(0, len(blocklists), 1000)]
) # This is the same as the for loop below
# for i in range(0, len(blocklists), 1000):
# # This is appending a list of 1000 domains to the lists list. It is doing this by slicing the blocklists list to get the first 1000 domains, then the next 1000 domains, etc.
# lists.append(blocklists[i:i + 1000])
return lists
def upload_to_cloudflare(lists):
# A: It's iterating over the lists and uploading them to Cloudflare, the enumerate function is used to get the index of the list since lists is a list of lists
for i, lst in enumerate(lists):
list_name = f'adblock-list-{i + 1}'
url = f'https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists'
list_name = f"adblock-list-{i + 1}"
url = (
f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists"
)
headers = {
'Authorization': f'Bearer {TOKEN}',
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
}
data = {
"name": list_name,
"type": "DOMAIN",
"description": "A blocklist of ad domains",
# Writing this program, I have noticed how powerful list comprehension is.
"items": [
{
"value": x,
}
for x in lst
],
}
response = requests.post(url, headers=headers, json=data, timeout=10)
print(f"Uploaded {list_name} to Cloudflare")
if response.status_code != 200:
print(f"Error uploading {list_name}: {response.text}")
def create_dns_policy(lists):
url = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/rules"
headers = {
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
}
# For debugging:
with open('lists.txt', 'w') as f:
f.write(str([{'value': x,} for x in lst[:10]]))
# exit()
data = {
'name': list_name,
'type': 'DOMAIN',
'description': 'A blocklist of ad domains',
# Writing this program, I have noticed how powerful list comprehension is.
'items': [{'value': x,} for x in lst]}
response = requests.post(url, headers=headers, json=data)
print(f'Uploaded {list_name} to Cloudflare')
if response.status_code != 200:
print(f'Error uploading {list_name}: {response.text}')
def create_dns_policy(lists):
url = f'https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/rules'
headers = {
'Authorization': f'Bearer {TOKEN}',
'Content-Type': 'application/json'
}
# Construct the traffic string
traffic = ''
traffic = ""
for i, lst in enumerate(lists):
if i != 0:
# ' or ' cannot be seen in the Zero Trs Dashboard
traffic += ' or '
traffic += " or "
traffic += f'any(dns.domains[*] in ${lst["id"]})'
print(traffic)
# print(traffic)
data = {
'name': 'Block Ads',
'description': 'Block ad domains',
'action': 'block',
'traffic': traffic,
"name": "Block Ads",
"description": "Block ad domains",
"action": "block",
"traffic": traffic,
"enabled": True,
}
response = requests.post(url, headers=headers, json=data)
response = requests.post(url, headers=headers, json=data, timeout=10)
if response.status_code != 200:
print(f'Error creating DNS policy: {response.text}')
print(f"Error creating DNS policy: {response.text}")
if __name__ == '__main__':
def main():
blocklists = get_blocklists()
blocklists = apply_whitelists(blocklists)
lists = split_list(blocklists)
@ -83,3 +100,6 @@ if __name__ == '__main__':
cloud_lists = utils.filter_adblock_lists(cloud_lists)
create_dns_policy(cloud_lists)
if __name__ == "__main__":
main()

View File

@ -1,30 +1,52 @@
# This is a scriprt to undo the changes made by adblock-zerotrust.py
import requests
import os
import utils
# Load environment variables
TOKEN = utils.load_env()['CLOUDFLARE_TOKEN']
ACCOUNT_ID = utils.load_env()['CLOUDFLARE_ACCOUNT_ID']
TOKEN = utils.load_env()["CLOUDFLARE_TOKEN"]
ACCOUNT_ID = utils.load_env()["CLOUDFLARE_ACCOUNT_ID"]
def delete_adblock_list(lists: dict):
for lst in lists:
url = f'https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists/{lst["id"]}'
headers = {
'Authorization': f'Bearer {TOKEN}',
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
}
response = requests.delete(url, headers=headers)
response = requests.delete(url, headers=headers, timeout=10)
if response.status_code != 200:
print(f'Error deleting list: {response.text}')
print(f"Error deleting list: {response.text}")
else:
print(f'Deleted list {lst["name"]}')
def delete_adblock_policy(policies: dict):
for policy in policies:
if policy["name"] == "Block Ads":
url = f'https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/rules/{policy["id"]}'
headers = {
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
}
response = requests.delete(url, headers=headers, timeout=10)
if response.status_code != 200:
print(f"Error deleting policy: {response.text}")
else:
print(f'Deleted policy {policy["name"]}')
break
else:
print("No policy found")
def main():
rules = utils.get_gateway_rules()
delete_adblock_policy(rules)
lists = utils.get_lists()
lists = utils.filter_adblock_lists(lists)
delete_adblock_list(lists)
if __name__ == '__main__':
main()
if __name__ == "__main__":
main()

View File

@ -1,55 +1,84 @@
import pathlib
import os
import os
import requests
from dotenv import load_dotenv
# List Utils
# Convert a hosts file to a simple hostname list
def convert_to_list(file:pathlib.Path) -> list:
with open(file, 'r') as f:
def convert_to_list(file: pathlib.Path) -> list:
with open(file, "r") as f:
# Don't read commented lines; strip whitespace; remove 127.0.0.1 from beginning of line; ignore lines with "localhost"; ignore lines with "::1"; ignore newlines blocklists.extend([i[10:].strip() for i in f.readlines() if not i.startswith('#') and 'localhost' not in i and '::1' not in i])
hosts = ([i[10:].strip() for i in f.readlines() if not i.startswith('#') and 'localhost' not in i and '::1' not in i])
hosts = [
i[10:].strip()
for i in f.readlines()
if not i.startswith("#") and "localhost" not in i and "::1" not in i
]
# Equivalent to:
# for x in f.readlines():
# if not x.startswith('#') and 'localhost' not in x and '::1' not in x:
# hosts.append(x[10:].strip())
# If there are any empty strings in the list, remove them since for some reason, whitespace is stil in the list
hosts = [i for i in hosts if i != '']
hosts = [i for i in hosts if i != ""]
return hosts
# General Utils
# Load environment variables
def load_env() -> dict:
load_dotenv(".env")
if not os.environ.get('CLOUDFLARE_TOKEN') and not os.environ.get('CLOUDFLARE_ACCOUNT_ID'):
if not os.environ.get("CLOUDFLARE_TOKEN") and not os.environ.get(
"CLOUDFLARE_ACCOUNT_ID"
):
load_dotenv(".envrc")
if not os.environ.get('CLOUDFLARE_TOKEN') or not os.environ.get('CLOUDFLARE_ACCOUNT_ID'):
print('No environment variables found. Please create a .env file or .envrc file')
if not os.environ.get("CLOUDFLARE_TOKEN") or not os.environ.get(
"CLOUDFLARE_ACCOUNT_ID"
):
print(
"No environment variables found. Please create a .env file or .envrc file"
)
exit()
else:
return {"CLOUDFLARE_TOKEN": os.environ.get('CLOUDFLARE_TOKEN'), "CLOUDFLARE_ACCOUNT_ID": os.environ.get('CLOUDFLARE_ACCOUNT_ID')}
else:
return {
"CLOUDFLARE_TOKEN": os.environ.get("CLOUDFLARE_TOKEN"),
"CLOUDFLARE_ACCOUNT_ID": os.environ.get("CLOUDFLARE_ACCOUNT_ID"),
}
def get_lists() -> dict:
url = f'https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists'
url = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists"
headers = {
'Authorization': f'Bearer {TOKEN}',
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
}
response = requests.get(url, headers=headers)
response = requests.get(url, headers=headers, timeout=10)
if response.status_code != 200:
print(f'Error getting lists: {response.text}')
return response.json()['result']
print(f"Error getting lists: {response.text}")
return response.json()["result"]
def filter_adblock_lists(lists: dict) -> dict:
adblock_lists = []
for lst in lists:
if lst['name'].startswith('adblock-list') and lst['type'] == 'DOMAIN':
if lst["name"].startswith("adblock-list") and lst["type"] == "DOMAIN":
adblock_lists.append(lst)
return adblock_lists
TOKEN = load_env()['CLOUDFLARE_TOKEN']
ACCOUNT_ID = load_env()['CLOUDFLARE_ACCOUNT_ID']
def get_gateway_rules() -> dict:
url = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/rules"
headers = {
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code != 200:
print(f"Error getting lists: {response.text}")
return response.json()["result"]
TOKEN = load_env()["CLOUDFLARE_TOKEN"]
ACCOUNT_ID = load_env()["CLOUDFLARE_ACCOUNT_ID"]