diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b672c74..d29f0d4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 6.23.0 - May 21, 2025 +* Introduces --http-resilience flag for the os-update command which runs a scraper with techniques to avoid getting + disconnected from a source server. + ## 6.22.3 - May 14, 2025 * Use DAG run start time for archiving scrape output diff --git a/openstates/cli/update.py b/openstates/cli/update.py index 4b0a702c..0f707389 100644 --- a/openstates/cli/update.py +++ b/openstates/cli/update.py @@ -91,6 +91,7 @@ def do_scrape( fastmode=args.fastmode, realtime=args.realtime, file_archiving_enabled=args.archive, + http_resilience_mode=args.http_resilience, ) report["jurisdiction"] = jscraper.do_scrape() stats.write_stats( @@ -130,6 +131,7 @@ def do_scrape( fastmode=args.fastmode, realtime=args.realtime, file_archiving_enabled=args.archive, + http_resilience_mode=args.http_resilience, ) partial_report = scraper.do_scrape(**scrape_args, session=session) stats.write_stats( @@ -163,6 +165,7 @@ def do_scrape( fastmode=args.fastmode, realtime=args.realtime, file_archiving_enabled=args.archive, + http_resilience_mode=args.http_resilience, ) report[scraper_name] = scraper.do_scrape(**scrape_args) session = scrape_args.get("session", "") @@ -554,6 +557,13 @@ def parse_args() -> tuple[argparse.Namespace, list[str]]: dest="SCRAPELIB_RETRY_WAIT_SECONDS", ) + # HTTP resilience mode: enable random delays, user agents, more complicated retries + parser.add_argument( + "--http-resilience", + action="store_true", + help="enable HTTP resilience mode, defaults to false", + ) + # realtime mode parser.add_argument("--realtime", action="store_true", help="enable realtime mode") diff --git a/openstates/scrape/base.py b/openstates/scrape/base.py index 0408b2ff..783b9460 100644 --- a/openstates/scrape/base.py +++ b/openstates/scrape/base.py @@ -1,11 +1,16 @@ import boto3 # noqa import datetime +from http.client import RemoteDisconnected import importlib import json import jsonschema import logging import os +import random +import requests import scrapelib +import time +from urllib.error import URLError import uuid from collections import defaultdict, OrderedDict from jsonschema import Draft3Validator, FormatChecker @@ -66,18 +71,35 @@ def clean_whitespace(obj): return obj +def get_random_user_agent(): + """ + Return a random user agent to help avoid detection. + """ + user_agents = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36", + "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1", + "Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59", + ] + return random.choice(user_agents) + + class Scraper(scrapelib.Scraper): """Base class for all scrapers""" def __init__( - self, - jurisdiction, - datadir, - *, - strict_validation=True, - fastmode=False, - realtime=False, - file_archiving_enabled=False, + self, + jurisdiction, + datadir, + *, + strict_validation=True, + fastmode=False, + realtime=False, + file_archiving_enabled=False, + http_resilience_mode=False, ): super(Scraper, self).__init__() @@ -94,6 +116,19 @@ def __init__( self.retry_wait_seconds = settings.SCRAPELIB_RETRY_WAIT_SECONDS self.verify = settings.SCRAPELIB_VERIFY + # HTTP connection resilience settings + self.http_resilience_mode = http_resilience_mode + self.http_resilience_headers = {} + # http resilience: Set up a circuit breaker to track consecutive failures + self._consecutive_failures = 0 + self._max_consecutive_failures = 3 + self._circuit_breaker_timeout = 120 # 2 minutes + # http resilience: Set up connection pool reset + self._last_reset_time = time.time() + self._reset_interval = 600 # Reset connection pool every 10 minutes + self._random_delay_on_failure_min = 5 + self._random_delay_on_failure_max = 15 + # output self.output_file_path = None @@ -126,6 +161,10 @@ def __init__( handler = importlib.import_module(modname) self.scrape_output_handler = handler.Handler(self) + if self.http_resilience_mode: + self.headers["User-Agent"] = get_random_user_agent() + self._create_fresh_session() + def push_to_queue(self): """Push this output to the sqs for realtime imports.""" @@ -185,9 +224,8 @@ def save_object(self, obj): # Remove redundant prefix try: - upload_file_path = file_path[ - file_path.index("_data") + len("_data") + 1 : - ] + index = file_path.index("_data") + len("_data") + 1 + upload_file_path = file_path[index:] except Exception: upload_file_path = file_path @@ -268,6 +306,165 @@ def scrape(self, **kwargs): self.__class__.__name__ + " must provide a scrape() method" ) + def request_resiliently(self, request_func): + try: + # Reset connection pool if needed + self._reset_connection_pool_if_needed() + + # Add a random delay between processing items + self.add_random_delay(1, 3) + + # If we've had too many consecutive failures, pause for a while + if self._consecutive_failures >= self._max_consecutive_failures: + self.logger.warning( + f"Circuit breaker triggered after {self._consecutive_failures} consecutive failures. " + f"Pausing for {self._circuit_breaker_timeout} seconds." + ) + time.sleep(self._circuit_breaker_timeout) + self._consecutive_failures = 0 + + # Rotate user agent after circuit breaker timeout + self.headers["User-Agent"] = get_random_user_agent() + + response = self.retry_on_connection_error( + request_func, + max_retries=3, + initial_backoff=10, + max_backoff=120, + ) + + # Reset consecutive failures counter on success + self._consecutive_failures = 0 + + return response + except Exception as e: + self._consecutive_failures += 1 + self.logger.error(f"Error processing item: {e}") + + # If it's a connection error, add a longer delay + if isinstance(e, (ConnectionError, RemoteDisconnected)): + self.logger.warning("Connection error. Adding longer delay.") + self.add_random_delay(self._random_delay_on_failure_min, self._random_delay_on_failure_max) + + # Rotate user agent after connection error + self.headers["User-Agent"] = get_random_user_agent() + + def get(self, url, **kwargs): + request_func = lambda: super(Scraper, self).get(url, **kwargs) # noqa: E731 + if self.http_resilience_mode: + return self.request_resiliently(request_func) + else: + return super().get(url, **kwargs) + + def post(self, url, data=None, json=None, **kwargs): + request_func = lambda: super(Scraper, self).post(url, data=data, json=json ** kwargs) # noqa: E731 + if self.http_resilience_mode: + return self.request_resiliently(request_func) + else: + return super().post(url, data=data, json=json, **kwargs) + + def retry_on_connection_error(self, func, max_retries=5, initial_backoff=2, max_backoff=60): + """ + Retry a function call on connection errors with exponential backoff. + + Args: + func: Function to call + max_retries: Maximum number of retries + initial_backoff: Initial backoff time in seconds + max_backoff: Maximum backoff time in seconds + + Returns: + The result of the function call + """ + retries = 0 + backoff = initial_backoff + + while True: + try: + return func() + except ( + ConnectionError, + RemoteDisconnected, + URLError, + requests.exceptions.Timeout, + requests.exceptions.RequestException, + ) as e: + retries += 1 + if retries > max_retries: + self.logger.error(f"Max retries ({max_retries}) exceeded. Last error: {e}") + raise + + # Calculate backoff with jitter + jitter = random.uniform(0.8, 1.2) + current_backoff = min(backoff * jitter, max_backoff) + + self.logger.warning( + f"Connection error: {e}. Retrying in {current_backoff:.2f} seconds (attempt {retries}/{max_retries})" + ) + time.sleep(current_backoff) + + # Increase backoff for next retry + backoff = min(backoff * 2, max_backoff) + + def _create_fresh_session(self): + """ + Create a fresh session with appropriate settings. + """ + if hasattr(self, "session"): + self.session.close() + + # Create a new session + self.session = requests.Session() + + # Set any custom headers + self.session.headers.update(self.http_resilience_headers) + + # Set up retry mechanism + adapter = requests.adapters.HTTPAdapter( + max_retries=self.retry_attempts, + pool_connections=10, + pool_maxsize=10, + pool_block=False, + ) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + self.headers["User-Agent"] = get_random_user_agent() + + self.logger.info( + f"Created fresh session with user agent: {self.headers['User-Agent']}" + ) + + return self.session + + def _reset_connection_pool_if_needed(self): + """ + Reset the connection pool if it's been too long since the last reset. + This helps prevent "Remote end closed connection without response" errors. + """ + current_time = time.time() + if current_time - self._last_reset_time > self._reset_interval: + self.logger.info( + f"Resetting connection pool after {self._reset_interval} seconds" + ) + + # Create a fresh session + self._create_fresh_session() + + self._last_reset_time = current_time + + def add_random_delay(self, min_seconds=1, max_seconds=3): + """ + Add a random delay to simulate human behavior. + + Args: + min_seconds: Minimum delay in seconds + max_seconds: Maximum delay in seconds + """ + delay = random.uniform(min_seconds, max_seconds) + self.logger.debug(f"Adding random delay of {delay:.2f} seconds") + time.sleep(delay) + class BaseBillScraper(Scraper): skipped = 0 @@ -400,15 +597,15 @@ def add_link(self, url, *, note=""): class AssociatedLinkMixin(object): def _add_associated_link( - self, - collection, - note, - url, - *, - media_type, - on_duplicate="warn", - date="", - classification="", + self, + collection, + note, + url, + *, + media_type, + on_duplicate="warn", + date="", + classification="", ): if on_duplicate not in ["error", "ignore", "warn"]: raise ScrapeValueError("on_duplicate must be 'warn', 'error' or 'ignore'") @@ -435,7 +632,7 @@ def _add_associated_link( seen_links.add(link["url"]) if all( - ver.get(x) == item.get(x) for x in ["note", "date", "classification"] + ver.get(x) == item.get(x) for x in ["note", "date", "classification"] ): matches = matches + 1 ver = item diff --git a/pyproject.toml b/pyproject.toml index cfbaf4b9..945127de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openstates" -version = "6.22.3" +version = "6.23.0" description = "core infrastructure for the openstates project" authors = ["James Turk "] license = "MIT"