Skip to content

Commit 88e163d

Browse files
committed
Add a basic concurrent fetch script
This script takes the concurrency.futures and request logic out of `peakbagger_list_latlong_data_to_gpx.py` and makes it a bit more generic.
1 parent 9cd2786 commit 88e163d

File tree

1 file changed

+104
-0
lines changed

1 file changed

+104
-0
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
#!/usr/bin/env python3
2+
3+
import argparse
4+
import concurrent.futures
5+
import logging
6+
import os
7+
import pathlib
8+
import sys
9+
from typing import Optional
10+
11+
import requests
12+
13+
14+
# pylint: disable=useless-return
15+
16+
def fetch(
17+
output_dir: pathlib.Path, url: str, retries: int, timeout: Optional[float]
18+
) -> None:
19+
dest_path = str(output_dir / os.path.basename(url))
20+
21+
logging.debug("Will fetch %r -> %r with timeout=%r.", url, dest_path, timeout)
22+
23+
for try_left in range(retries - 1, -1, -1):
24+
try:
25+
resp = requests.get(url, stream=True, timeout=timeout)
26+
resp.raise_for_status()
27+
with open(dest_path, "w") as output_fp:
28+
output_fp.write(resp.text)
29+
except (requests.ConnectionError, requests.Timeout):
30+
if try_left == 0:
31+
raise
32+
logging.exception("Fetching %r failed; will try fetching %d more times.", url, timeout)
33+
return None
34+
35+
36+
def fetch_concurrent(
37+
output_dir: pathlib.Path,
38+
urls: list[str],
39+
retries: int,
40+
max_workers: Optional[int],
41+
timeout: Optional[float],
42+
) -> list[str]:
43+
failed_fetches = []
44+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
45+
future_to_urls = {
46+
executor.submit(
47+
fetch, output_dir=output_dir, retries=retries, timeout=timeout, url=url
48+
): url
49+
for url in urls
50+
}
51+
for future in concurrent.futures.as_completed(future_to_urls):
52+
url = future_to_urls[future]
53+
try:
54+
future.result()
55+
except Exception: # pylint: disable=broad-except
56+
logging.exception("%s did not fetch successfully.", url)
57+
failed_fetches.append(url)
58+
return failed_fetches
59+
60+
61+
def main(argv: Optional[str] = None) -> int:
62+
def positive_number(value: str) -> int:
63+
int_value = int(value)
64+
if int_value <= 0:
65+
raise ValueError("Value must be a positive integer.")
66+
return int_value
67+
68+
argparser = argparse.ArgumentParser()
69+
argparser.add_argument("--max-workers", default=None, type=int)
70+
argparser.add_argument("--output-dir", required=True)
71+
argparser.add_argument("--retries", default=1, type=positive_number)
72+
argparser.add_argument("--timeout", default=None, type=float)
73+
argparser.add_argument("--verbose", action="store_true", default=False)
74+
argparser.add_argument("urls", nargs="+")
75+
76+
args = argparser.parse_args(argv)
77+
78+
output_dir = pathlib.Path(args.output_dir)
79+
# output_dir.mkdir(exist_ok=True, parents=True)
80+
81+
if args.verbose:
82+
logging.basicConfig()
83+
logging.getLogger().setLevel(logging.DEBUG)
84+
requests_log = logging.getLogger("urllib3")
85+
requests_log.setLevel(logging.DEBUG)
86+
requests_log.propagate = True
87+
88+
# logging.debug(f"Will fetch {args.urls!r}")
89+
90+
failed_fetches = fetch_concurrent(
91+
output_dir=output_dir,
92+
urls=args.urls,
93+
retries=args.retries,
94+
max_workers=args.max_workers,
95+
timeout=args.timeout,
96+
)
97+
98+
# logging.debug(f"Fetched {args.urls!r}")
99+
100+
return len(failed_fetches)
101+
102+
103+
if __name__ == "__main__":
104+
sys.exit(main())

0 commit comments

Comments
 (0)