From 2b7726a0c089d749090f5f7d160eab6df2ed6e9d Mon Sep 17 00:00:00 2001 From: Jesse Mortenson Date: Wed, 7 May 2025 17:32:06 -0500 Subject: [PATCH 1/3] Add http resilience mode (first draft) --- openstates/cli/update.py | 10 ++ openstates/scrape/base.py | 198 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 208 insertions(+) diff --git a/openstates/cli/update.py b/openstates/cli/update.py index 754f3ddc..53ffc077 100644 --- a/openstates/cli/update.py +++ b/openstates/cli/update.py @@ -90,6 +90,7 @@ def do_scrape( fastmode=args.fastmode, realtime=args.realtime, file_archiving_enabled=args.archive, + http_resilience_mode=args.http_resilience, ) report["jurisdiction"] = jscraper.do_scrape() stats.write_stats( @@ -129,6 +130,7 @@ def do_scrape( fastmode=args.fastmode, realtime=args.realtime, file_archiving_enabled=args.archive, + http_resilience_mode=args.http_resilience, ) partial_report = scraper.do_scrape(**scrape_args, session=session) last_scrape_end_datetime = partial_report["end"] @@ -163,6 +165,7 @@ def do_scrape( fastmode=args.fastmode, realtime=args.realtime, file_archiving_enabled=args.archive, + http_resilience_mode=args.http_resilience, ) report[scraper_name] = scraper.do_scrape(**scrape_args) last_scrape_end_datetime = report[scraper_name]["end"] @@ -555,6 +558,13 @@ def parse_args() -> tuple[argparse.Namespace, list[str]]: dest="SCRAPELIB_RETRY_WAIT_SECONDS", ) + # HTTP resilience mode: enable random delays, user agents, more complicated retries + parser.add_argument( + "--http-resilience", + action="store_true", + help="enable HTTP resilience mode, defaults to false", + ) + # realtime mode parser.add_argument("--realtime", action="store_true", help="enable realtime mode") diff --git a/openstates/scrape/base.py b/openstates/scrape/base.py index 0408b2ff..b6d4c514 100644 --- a/openstates/scrape/base.py +++ b/openstates/scrape/base.py @@ -1,11 +1,16 @@ import boto3 # noqa import datetime +from http.client import RemoteDisconnected import importlib import json import jsonschema import logging import os +import random +import requests import scrapelib +import time +from urllib.error import URLError import uuid from collections import defaultdict, OrderedDict from jsonschema import Draft3Validator, FormatChecker @@ -66,6 +71,22 @@ def clean_whitespace(obj): return obj +def get_random_user_agent(): + """ + Return a random user agent to help avoid detection. + """ + user_agents = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36", + "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1", + "Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59", + ] + return random.choice(user_agents) + + class Scraper(scrapelib.Scraper): """Base class for all scrapers""" @@ -78,6 +99,7 @@ def __init__( fastmode=False, realtime=False, file_archiving_enabled=False, + http_resilience_mode=False, ): super(Scraper, self).__init__() @@ -94,6 +116,18 @@ def __init__( self.retry_wait_seconds = settings.SCRAPELIB_RETRY_WAIT_SECONDS self.verify = settings.SCRAPELIB_VERIFY + # HTTP connection resilience settings + self.http_resilience_mode = http_resilience_mode + # http resilience: Set up a circuit breaker to track consecutive failures + self._consecutive_failures = 0 + self._max_consecutive_failures = 3 + self._circuit_breaker_timeout = 120 # 2 minutes + # http resilience: Set up connection pool reset + self._last_reset_time = time.time() + self._reset_interval = 600 # Reset connection pool every 10 minutes + self._random_delay_on_failure_min = 5 + self._random_delay_on_failure_max = 15 + # output self.output_file_path = None @@ -126,6 +160,10 @@ def __init__( handler = importlib.import_module(modname) self.scrape_output_handler = handler.Handler(self) + if self.http_resilience_mode: + self.headers["User-Agent"] = get_random_user_agent() + self._create_fresh_session() + def push_to_queue(self): """Push this output to the sqs for realtime imports.""" @@ -268,6 +306,166 @@ def scrape(self, **kwargs): self.__class__.__name__ + " must provide a scrape() method" ) + def request_resiliently(self, request_func): + try: + # Reset connection pool if needed + self._reset_connection_pool_if_needed() + + # Add a random delay between processing items + self.add_random_delay(1, 3) + + # If we've had too many consecutive failures, pause for a while + if self._consecutive_failures >= self._max_consecutive_failures: + self.logger.warning( + f"Circuit breaker triggered after {self._consecutive_failures} consecutive failures. " + f"Pausing for {self._circuit_breaker_timeout} seconds." + ) + time.sleep(self._circuit_breaker_timeout) + self._consecutive_failures = 0 + + # Rotate user agent after circuit breaker timeout + self.headers["User-Agent"] = get_random_user_agent() + + response = self.retry_on_connection_error( + request_func, + max_retries=3, + initial_backoff=10, + max_backoff=120, + ) + + # Reset consecutive failures counter on success + self._consecutive_failures = 0 + + return response + except Exception as e: + self._consecutive_failures += 1 + self.logger.error(f"Error processing item: {e}") + + # If it's a connection error, add a longer delay + if isinstance(e, (ConnectionError, RemoteDisconnected)): + self.logger.warning("Connection error. Adding longer delay.") + self.add_random_delay(self._random_delay_on_failure_min, self._random_delay_on_failure_max) + + # Rotate user agent after connection error + self.headers["User-Agent"] = get_random_user_agent() + + def get(self, url, **kwargs): + request_func = lambda: super(Scraper, self).get(url, **kwargs) + if self.http_resilience_mode: + self.request_resiliently(request_func) + else: + return super().get(url, **kwargs) + + def post(self, url, data=None, json=None, **kwargs): + request_func = lambda: super(Scraper, self).post(url, data=data, json=json **kwargs) + if self.http_resilience_mode: + self.request_resiliently(request_func) + else: + return super().post(url, data=data, json=json, **kwargs) + + def retry_on_connection_error(self, func, max_retries=5, initial_backoff=2, max_backoff=60): + """ + Retry a function call on connection errors with exponential backoff. + + Args: + func: Function to call + max_retries: Maximum number of retries + initial_backoff: Initial backoff time in seconds + max_backoff: Maximum backoff time in seconds + + Returns: + The result of the function call + """ + retries = 0 + backoff = initial_backoff + + while True: + try: + return func() + except ( + ConnectionError, + RemoteDisconnected, + URLError, + requests.exceptions.Timeout, + requests.exceptions.RequestException, + ) as e: + retries += 1 + if retries > max_retries: + self.logger.error(f"Max retries ({max_retries}) exceeded. Last error: {e}") + raise + + # Calculate backoff with jitter + jitter = random.uniform(0.8, 1.2) + current_backoff = min(backoff * jitter, max_backoff) + + self.logger.warning( + f"Connection error: {e}. Retrying in {current_backoff:.2f} seconds (attempt {retries}/{max_retries})" + ) + time.sleep(current_backoff) + + # Increase backoff for next retry + backoff = min(backoff * 2, max_backoff) + + def _create_fresh_session(self): + """ + Create a fresh session with appropriate settings. + """ + if hasattr(self, "session"): + self.session.close() + + # Create a new session + self.session = requests.Session() + + # IA specific thing + self.session.headers.update({"X-Requested-With": "XMLHttpRequest"}) + + # Set up retry mechanism + adapter = requests.adapters.HTTPAdapter( + max_retries=self.retry_attempts, + pool_connections=10, + pool_maxsize=10, + pool_block=False, + ) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + self.headers["User-Agent"] = get_random_user_agent() + + self.logger.info( + f"Created fresh session with user agent: {self.headers['User-Agent']}" + ) + + return self.session + + def _reset_connection_pool_if_needed(self): + """ + Reset the connection pool if it's been too long since the last reset. + This helps prevent "Remote end closed connection without response" errors. + """ + current_time = time.time() + if current_time - self._last_reset_time > self._reset_interval: + self.logger.info( + f"Resetting connection pool after {self._reset_interval} seconds" + ) + + # Create a fresh session + self._create_fresh_session() + + self._last_reset_time = current_time + + def add_random_delay(self, min_seconds=1, max_seconds=3): + """ + Add a random delay to simulate human behavior. + + Args: + min_seconds: Minimum delay in seconds + max_seconds: Maximum delay in seconds + """ + delay = random.uniform(min_seconds, max_seconds) + self.logger.debug(f"Adding random delay of {delay:.2f} seconds") + time.sleep(delay) + + class BaseBillScraper(Scraper): skipped = 0 From 1f87b77b893ab15c49cc3f262851abfe3fba238d Mon Sep 17 00:00:00 2001 From: Jesse Mortenson Date: Fri, 16 May 2025 15:49:56 -0500 Subject: [PATCH 2/3] Resilience mode: allow custom headers, cleaup --- openstates/scrape/base.py | 57 +++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/openstates/scrape/base.py b/openstates/scrape/base.py index b6d4c514..783b9460 100644 --- a/openstates/scrape/base.py +++ b/openstates/scrape/base.py @@ -91,15 +91,15 @@ class Scraper(scrapelib.Scraper): """Base class for all scrapers""" def __init__( - self, - jurisdiction, - datadir, - *, - strict_validation=True, - fastmode=False, - realtime=False, - file_archiving_enabled=False, - http_resilience_mode=False, + self, + jurisdiction, + datadir, + *, + strict_validation=True, + fastmode=False, + realtime=False, + file_archiving_enabled=False, + http_resilience_mode=False, ): super(Scraper, self).__init__() @@ -118,6 +118,7 @@ def __init__( # HTTP connection resilience settings self.http_resilience_mode = http_resilience_mode + self.http_resilience_headers = {} # http resilience: Set up a circuit breaker to track consecutive failures self._consecutive_failures = 0 self._max_consecutive_failures = 3 @@ -223,9 +224,8 @@ def save_object(self, obj): # Remove redundant prefix try: - upload_file_path = file_path[ - file_path.index("_data") + len("_data") + 1 : - ] + index = file_path.index("_data") + len("_data") + 1 + upload_file_path = file_path[index:] except Exception: upload_file_path = file_path @@ -350,16 +350,16 @@ def request_resiliently(self, request_func): self.headers["User-Agent"] = get_random_user_agent() def get(self, url, **kwargs): - request_func = lambda: super(Scraper, self).get(url, **kwargs) + request_func = lambda: super(Scraper, self).get(url, **kwargs) # noqa: E731 if self.http_resilience_mode: - self.request_resiliently(request_func) + return self.request_resiliently(request_func) else: return super().get(url, **kwargs) def post(self, url, data=None, json=None, **kwargs): - request_func = lambda: super(Scraper, self).post(url, data=data, json=json **kwargs) + request_func = lambda: super(Scraper, self).post(url, data=data, json=json ** kwargs) # noqa: E731 if self.http_resilience_mode: - self.request_resiliently(request_func) + return self.request_resiliently(request_func) else: return super().post(url, data=data, json=json, **kwargs) @@ -416,8 +416,8 @@ def _create_fresh_session(self): # Create a new session self.session = requests.Session() - # IA specific thing - self.session.headers.update({"X-Requested-With": "XMLHttpRequest"}) + # Set any custom headers + self.session.headers.update(self.http_resilience_headers) # Set up retry mechanism adapter = requests.adapters.HTTPAdapter( @@ -466,7 +466,6 @@ def add_random_delay(self, min_seconds=1, max_seconds=3): time.sleep(delay) - class BaseBillScraper(Scraper): skipped = 0 @@ -598,15 +597,15 @@ def add_link(self, url, *, note=""): class AssociatedLinkMixin(object): def _add_associated_link( - self, - collection, - note, - url, - *, - media_type, - on_duplicate="warn", - date="", - classification="", + self, + collection, + note, + url, + *, + media_type, + on_duplicate="warn", + date="", + classification="", ): if on_duplicate not in ["error", "ignore", "warn"]: raise ScrapeValueError("on_duplicate must be 'warn', 'error' or 'ignore'") @@ -633,7 +632,7 @@ def _add_associated_link( seen_links.add(link["url"]) if all( - ver.get(x) == item.get(x) for x in ["note", "date", "classification"] + ver.get(x) == item.get(x) for x in ["note", "date", "classification"] ): matches = matches + 1 ver = item From 6e330e630b1e01f944322c5d7dff0c4b00748fb1 Mon Sep 17 00:00:00 2001 From: Jesse Mortenson Date: Wed, 21 May 2025 13:41:05 -0500 Subject: [PATCH 3/3] Bump release version --- CHANGELOG.md | 4 ++++ pyproject.toml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b672c74..d29f0d4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 6.23.0 - May 21, 2025 +* Introduces --http-resilience flag for the os-update command which runs a scraper with techniques to avoid getting + disconnected from a source server. + ## 6.22.3 - May 14, 2025 * Use DAG run start time for archiving scrape output diff --git a/pyproject.toml b/pyproject.toml index cfbaf4b9..945127de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openstates" -version = "6.22.3" +version = "6.23.0" description = "core infrastructure for the openstates project" authors = ["James Turk "] license = "MIT"