Skip to content

Simulate Bot Traffic #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
74 changes: 74 additions & 0 deletions dags/app/cloudflare_bot/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Create bot network traffic
"""
import logging
from datetime import datetime, timedelta

import requests
from airflow.sdk import Variable, dag, task

DEFAULT_ARGS = {
"owner": "Henry Lee",
"depends_on_past": False,
"start_date": datetime(2025, 1, 1),
"retries": 2,
"retry_delay": timedelta(minutes=5),
}

logger = logging.getLogger(__name__)


@dag(
default_args=DEFAULT_ARGS,
schedule="@hourly",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just want to make sure we really want to do it hourly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the bot policy, the minimum traffic should exceed 1,000 requests per day across multiple domains, so a higher frequency is required.

max_active_runs=1,
catchup=False,
)
def PYCONTW_ETL_BOT_v1():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably a good idea to make the dag id more readable


@task
def GET_TOP_WEBSITES() -> list[str]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably a good idea to start using lower case

Suggested change
def GET_TOP_WEBSITES() -> list[str]:
def get_top_websties() -> list[str]:

"""Call Cloudflare Radar and return a list of the top-100 domains.

Docs: https://developers.cloudflare.com/api/resources/radar/subresources/ranking/methods/top/
"""
token = Variable.get("CLOUDFLARE_RADAR_API_TOKEN")

url = "https://api.cloudflare.com/client/v4/radar/ranking/top"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

params = {"limit": 100} # 100 is the maximum allowed
headers = {"Authorization": f"Bearer {token}"}

response = requests.get(url, headers=headers, params=params, timeout=10)
response.raise_for_status()
data = response.json()

# Radar API response: {"result":{"top_0":[{"domain":"google.com", ...}, ...]}}
domains = [item["domain"] for item in data.get("result", {}).get("top_0", [])]

logger.info("Fetched %d domains from Cloudflare Radar", len(domains))
return domains

@task
def REQUEST_EACH_WEBSITE(domains: list[str]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def REQUEST_EACH_WEBSITE(domains: list[str]):
def ping_each_website(domains: list[str]) -> None:

looks like we're pinging these sides?

"""Iterate through each domain and fire a GET request."""
for domain in domains:
site_url = f"https://www.{domain}" # request to the www subdomain
try:
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": "PYCONTWETL Bot",
}
resp = requests.get(site_url, headers=headers, timeout=5, allow_redirects=True)
logger.info("GET %s -> %s", site_url, resp.status_code)
except Exception as exc:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
except Exception as exc:
except Exception as exc:

We probably should catch a narrower exception.

logger.warning("Failed to reach %s: %s", site_url, exc)

top_domains = GET_TOP_WEBSITES()
REQUEST_EACH_WEBSITE(top_domains)


dag_obj = PYCONTW_ETL_BOT_v1()

if __name__ == "__main__":
dag_obj.test()