Skip to content

Add HTTP resilience mode #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 6.23.0 - May 21, 2025
* Introduces --http-resilience flag for the os-update command which runs a scraper with techniques to avoid getting
disconnected from a source server.

## 6.22.3 - May 14, 2025
* Use DAG run start time for archiving scrape output

Expand Down
10 changes: 10 additions & 0 deletions openstates/cli/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def do_scrape(
fastmode=args.fastmode,
realtime=args.realtime,
file_archiving_enabled=args.archive,
http_resilience_mode=args.http_resilience,
)
report["jurisdiction"] = jscraper.do_scrape()
stats.write_stats(
Expand Down Expand Up @@ -130,6 +131,7 @@ def do_scrape(
fastmode=args.fastmode,
realtime=args.realtime,
file_archiving_enabled=args.archive,
http_resilience_mode=args.http_resilience,
)
partial_report = scraper.do_scrape(**scrape_args, session=session)
stats.write_stats(
Expand Down Expand Up @@ -163,6 +165,7 @@ def do_scrape(
fastmode=args.fastmode,
realtime=args.realtime,
file_archiving_enabled=args.archive,
http_resilience_mode=args.http_resilience,
)
report[scraper_name] = scraper.do_scrape(**scrape_args)
session = scrape_args.get("session", "")
Expand Down Expand Up @@ -554,6 +557,13 @@ def parse_args() -> tuple[argparse.Namespace, list[str]]:
dest="SCRAPELIB_RETRY_WAIT_SECONDS",
)

# HTTP resilience mode: enable random delays, user agents, more complicated retries
parser.add_argument(
"--http-resilience",
action="store_true",
help="enable HTTP resilience mode, defaults to false",
)

# realtime mode
parser.add_argument("--realtime", action="store_true", help="enable realtime mode")

Expand Down
239 changes: 218 additions & 21 deletions openstates/scrape/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import boto3 # noqa
import datetime
from http.client import RemoteDisconnected
import importlib
import json
import jsonschema
import logging
import os
import random
import requests
import scrapelib
import time
from urllib.error import URLError
import uuid
from collections import defaultdict, OrderedDict
from jsonschema import Draft3Validator, FormatChecker
Expand Down Expand Up @@ -66,18 +71,35 @@ def clean_whitespace(obj):
return obj


def get_random_user_agent():
"""
Return a random user agent to help avoid detection.
"""
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59",
]
return random.choice(user_agents)


class Scraper(scrapelib.Scraper):
"""Base class for all scrapers"""

def __init__(
self,
jurisdiction,
datadir,
*,
strict_validation=True,
fastmode=False,
realtime=False,
file_archiving_enabled=False,
self,
jurisdiction,
datadir,
*,
strict_validation=True,
fastmode=False,
realtime=False,
file_archiving_enabled=False,
http_resilience_mode=False,
):
super(Scraper, self).__init__()

Expand All @@ -94,6 +116,19 @@ def __init__(
self.retry_wait_seconds = settings.SCRAPELIB_RETRY_WAIT_SECONDS
self.verify = settings.SCRAPELIB_VERIFY

# HTTP connection resilience settings
self.http_resilience_mode = http_resilience_mode
self.http_resilience_headers = {}
# http resilience: Set up a circuit breaker to track consecutive failures
self._consecutive_failures = 0
self._max_consecutive_failures = 3
self._circuit_breaker_timeout = 120 # 2 minutes
# http resilience: Set up connection pool reset
self._last_reset_time = time.time()
self._reset_interval = 600 # Reset connection pool every 10 minutes
self._random_delay_on_failure_min = 5
self._random_delay_on_failure_max = 15

# output
self.output_file_path = None

Expand Down Expand Up @@ -126,6 +161,10 @@ def __init__(
handler = importlib.import_module(modname)
self.scrape_output_handler = handler.Handler(self)

if self.http_resilience_mode:
self.headers["User-Agent"] = get_random_user_agent()
self._create_fresh_session()

def push_to_queue(self):
"""Push this output to the sqs for realtime imports."""

Expand Down Expand Up @@ -185,9 +224,8 @@ def save_object(self, obj):

# Remove redundant prefix
try:
upload_file_path = file_path[
file_path.index("_data") + len("_data") + 1 :
]
index = file_path.index("_data") + len("_data") + 1
upload_file_path = file_path[index:]
except Exception:
upload_file_path = file_path

Expand Down Expand Up @@ -268,6 +306,165 @@ def scrape(self, **kwargs):
self.__class__.__name__ + " must provide a scrape() method"
)

def request_resiliently(self, request_func):
try:
# Reset connection pool if needed
self._reset_connection_pool_if_needed()

# Add a random delay between processing items
self.add_random_delay(1, 3)

# If we've had too many consecutive failures, pause for a while
if self._consecutive_failures >= self._max_consecutive_failures:
self.logger.warning(
f"Circuit breaker triggered after {self._consecutive_failures} consecutive failures. "
f"Pausing for {self._circuit_breaker_timeout} seconds."
)
time.sleep(self._circuit_breaker_timeout)
self._consecutive_failures = 0

# Rotate user agent after circuit breaker timeout
self.headers["User-Agent"] = get_random_user_agent()

response = self.retry_on_connection_error(
request_func,
max_retries=3,
initial_backoff=10,
max_backoff=120,
)

# Reset consecutive failures counter on success
self._consecutive_failures = 0

return response
except Exception as e:
self._consecutive_failures += 1
self.logger.error(f"Error processing item: {e}")

# If it's a connection error, add a longer delay
if isinstance(e, (ConnectionError, RemoteDisconnected)):
self.logger.warning("Connection error. Adding longer delay.")
self.add_random_delay(self._random_delay_on_failure_min, self._random_delay_on_failure_max)

# Rotate user agent after connection error
self.headers["User-Agent"] = get_random_user_agent()

def get(self, url, **kwargs):
request_func = lambda: super(Scraper, self).get(url, **kwargs) # noqa: E731
if self.http_resilience_mode:
return self.request_resiliently(request_func)
else:
return super().get(url, **kwargs)

def post(self, url, data=None, json=None, **kwargs):
request_func = lambda: super(Scraper, self).post(url, data=data, json=json ** kwargs) # noqa: E731
if self.http_resilience_mode:
return self.request_resiliently(request_func)
else:
return super().post(url, data=data, json=json, **kwargs)

def retry_on_connection_error(self, func, max_retries=5, initial_backoff=2, max_backoff=60):
"""
Retry a function call on connection errors with exponential backoff.

Args:
func: Function to call
max_retries: Maximum number of retries
initial_backoff: Initial backoff time in seconds
max_backoff: Maximum backoff time in seconds

Returns:
The result of the function call
"""
retries = 0
backoff = initial_backoff

while True:
try:
return func()
except (
ConnectionError,
RemoteDisconnected,
URLError,
requests.exceptions.Timeout,
requests.exceptions.RequestException,
) as e:
retries += 1
if retries > max_retries:
self.logger.error(f"Max retries ({max_retries}) exceeded. Last error: {e}")
raise

# Calculate backoff with jitter
jitter = random.uniform(0.8, 1.2)
current_backoff = min(backoff * jitter, max_backoff)

self.logger.warning(
f"Connection error: {e}. Retrying in {current_backoff:.2f} seconds (attempt {retries}/{max_retries})"
)
time.sleep(current_backoff)

# Increase backoff for next retry
backoff = min(backoff * 2, max_backoff)

def _create_fresh_session(self):
"""
Create a fresh session with appropriate settings.
"""
if hasattr(self, "session"):
self.session.close()

# Create a new session
self.session = requests.Session()

# Set any custom headers
self.session.headers.update(self.http_resilience_headers)

# Set up retry mechanism
adapter = requests.adapters.HTTPAdapter(
max_retries=self.retry_attempts,
pool_connections=10,
pool_maxsize=10,
pool_block=False,
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)

self.headers["User-Agent"] = get_random_user_agent()

self.logger.info(
f"Created fresh session with user agent: {self.headers['User-Agent']}"
)

return self.session

def _reset_connection_pool_if_needed(self):
"""
Reset the connection pool if it's been too long since the last reset.
This helps prevent "Remote end closed connection without response" errors.
"""
current_time = time.time()
if current_time - self._last_reset_time > self._reset_interval:
self.logger.info(
f"Resetting connection pool after {self._reset_interval} seconds"
)

# Create a fresh session
self._create_fresh_session()

self._last_reset_time = current_time

def add_random_delay(self, min_seconds=1, max_seconds=3):
"""
Add a random delay to simulate human behavior.

Args:
min_seconds: Minimum delay in seconds
max_seconds: Maximum delay in seconds
"""
delay = random.uniform(min_seconds, max_seconds)
self.logger.debug(f"Adding random delay of {delay:.2f} seconds")
time.sleep(delay)


class BaseBillScraper(Scraper):
skipped = 0
Expand Down Expand Up @@ -400,15 +597,15 @@ def add_link(self, url, *, note=""):

class AssociatedLinkMixin(object):
def _add_associated_link(
self,
collection,
note,
url,
*,
media_type,
on_duplicate="warn",
date="",
classification="",
self,
collection,
note,
url,
*,
media_type,
on_duplicate="warn",
date="",
classification="",
):
if on_duplicate not in ["error", "ignore", "warn"]:
raise ScrapeValueError("on_duplicate must be 'warn', 'error' or 'ignore'")
Expand All @@ -435,7 +632,7 @@ def _add_associated_link(
seen_links.add(link["url"])

if all(
ver.get(x) == item.get(x) for x in ["note", "date", "classification"]
ver.get(x) == item.get(x) for x in ["note", "date", "classification"]
):
matches = matches + 1
ver = item
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "openstates"
version = "6.22.3"
version = "6.23.0"
description = "core infrastructure for the openstates project"
authors = ["James Turk <dev@jamesturk.net>"]
license = "MIT"
Expand Down