cloudflare-gateway-adblocking/zerotrust_adblock/adblock_zerotrust.py

107 lines
3.4 KiB
Python
Raw Normal View History

2023-03-20 18:40:08 +00:00
import os
import pathlib
2023-03-20 18:40:08 +00:00
import requests
import utils
# Load environment variables
2023-03-26 23:16:26 +01:00
TOKEN = utils.load_env()["CLOUDFLARE_TOKEN"]
ACCOUNT_ID = utils.load_env()["CLOUDFLARE_ACCOUNT_ID"]
2023-03-20 18:40:08 +00:00
def get_blocklists():
# __file__ is a special variable that is the path to the current file
2023-03-26 23:16:26 +01:00
list_directory = pathlib.Path(__file__).parent.parent.joinpath("blocklists")
2023-03-20 18:40:08 +00:00
for file in list_directory.iterdir():
blocklists = utils.convert_to_list(file)
return blocklists
2023-03-26 23:16:26 +01:00
2023-03-20 18:40:08 +00:00
def apply_whitelists(blocklists):
2023-03-26 23:16:26 +01:00
whitelist = utils.convert_to_list(
pathlib.Path(__file__).parent.parent.joinpath("whitelist.txt")
)
blocklists = [x for x in blocklists if x not in whitelist]
return blocklists
2023-03-20 18:40:08 +00:00
def split_list(blocklists):
lists = []
2023-03-26 23:16:26 +01:00
lists.extend(
[blocklists[i : i + 1000] for i in range(0, len(blocklists), 1000)]
) # This is the same as the for loop below
2023-03-20 18:40:08 +00:00
# for i in range(0, len(blocklists), 1000):
# # This is appending a list of 1000 domains to the lists list. It is doing this by slicing the blocklists list to get the first 1000 domains, then the next 1000 domains, etc.
# lists.append(blocklists[i:i + 1000])
return lists
2023-03-26 23:16:26 +01:00
2023-03-20 18:40:08 +00:00
def upload_to_cloudflare(lists):
# A: It's iterating over the lists and uploading them to Cloudflare, the enumerate function is used to get the index of the list since lists is a list of lists
for i, lst in enumerate(lists):
2023-03-26 23:16:26 +01:00
list_name = f"adblock-list-{i + 1}"
url = (
f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/lists"
)
2023-03-20 18:40:08 +00:00
headers = {
2023-03-26 23:16:26 +01:00
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
}
2023-03-20 18:40:08 +00:00
data = {
2023-03-26 23:16:26 +01:00
"name": list_name,
"type": "DOMAIN",
"description": "A blocklist of ad domains",
# Writing this program, I have noticed how powerful list comprehension is.
"items": [
{
"value": x,
}
for x in lst
],
}
response = requests.post(url, headers=headers, json=data, timeout=10)
print(f"Uploaded {list_name} to Cloudflare")
2023-03-20 18:40:08 +00:00
if response.status_code != 200:
2023-03-26 23:16:26 +01:00
print(f"Error uploading {list_name}: {response.text}")
2023-03-20 18:40:08 +00:00
def create_dns_policy(lists):
2023-03-26 23:16:26 +01:00
url = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/gateway/rules"
2023-03-20 18:40:08 +00:00
headers = {
2023-03-26 23:16:26 +01:00
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
2023-03-20 18:40:08 +00:00
}
# Construct the traffic string
2023-03-26 23:16:26 +01:00
traffic = ""
2023-03-20 18:40:08 +00:00
for i, lst in enumerate(lists):
if i != 0:
# ' or ' cannot be seen in the Zero Trs Dashboard
2023-03-26 23:16:26 +01:00
traffic += " or "
2023-03-20 18:40:08 +00:00
traffic += f'any(dns.domains[*] in ${lst["id"]})'
2023-03-26 23:16:26 +01:00
# print(traffic)
2023-03-20 18:40:08 +00:00
data = {
2023-03-26 23:16:26 +01:00
"name": "Block Ads",
"description": "Block ad domains",
"action": "block",
"traffic": traffic,
"enabled": True,
2023-03-20 18:40:08 +00:00
}
2023-03-26 23:16:26 +01:00
response = requests.post(url, headers=headers, json=data, timeout=10)
2023-03-20 18:40:08 +00:00
if response.status_code != 200:
2023-03-26 23:16:26 +01:00
print(f"Error creating DNS policy: {response.text}")
2023-03-20 18:40:08 +00:00
2023-03-26 23:16:26 +01:00
def main():
2023-03-20 18:40:08 +00:00
blocklists = get_blocklists()
blocklists = apply_whitelists(blocklists)
lists = split_list(blocklists)
upload_to_cloudflare(lists)
cloud_lists = utils.get_lists()
cloud_lists = utils.filter_adblock_lists(cloud_lists)
create_dns_policy(cloud_lists)
2023-03-26 23:16:26 +01:00
if __name__ == "__main__":
main()