From 5b92f8d16da1f57ea8bb25699727a4f74ee9b29b Mon Sep 17 00:00:00 2001 From: Benjamin Webb Date: Fri, 28 Feb 2025 17:48:17 -0500 Subject: [PATCH] Implement q priority sorting of requests --- pygeoapi/api/__init__.py | 41 +++++++++-------- pygeoapi/l10n.py | 95 ++++++---------------------------------- pygeoapi/util.py | 56 +++++++++++++++++++++++ tests/test_util.py | 29 ++++++++++++ 4 files changed, 120 insertions(+), 101 deletions(-) diff --git a/pygeoapi/api/__init__.py b/pygeoapi/api/__init__.py index f73b9b473..c548bc8fb 100644 --- a/pygeoapi/api/__init__.py +++ b/pygeoapi/api/__init__.py @@ -66,7 +66,8 @@ CrsTransformSpec, TEMPLATES, UrlPrefetcher, dategetter, filter_dict_by_key_value, filter_providers_by_type, get_api_rules, get_base_url, get_provider_by_type, get_provider_default, get_typed_value, - get_crs_from_uri, get_supported_crs_list, render_j2_template, to_json + get_crs_from_uri, get_supported_crs_list, render_j2_template, to_json, + get_choice_from_headers, get_from_headers ) LOGGER = logging.getLogger(__name__) @@ -150,7 +151,8 @@ def apply_gzip(headers: dict, content: Union[str, bytes]) -> Union[str, bytes]: Compress content if requested in header. """ charset = CHARSET[0] - if F_GZIP in headers.get('Content-Encoding', []): + + if F_GZIP in get_from_headers(headers, 'content-encoding'): try: if isinstance(content, bytes): # bytes means Content-Type needs to be set upstream @@ -305,16 +307,18 @@ def _get_locale(self, headers, supported_locales): raise ValueError(f"{self.__class__.__name__} must be initialized" f"with a list of valid supported locales") - for func, mapping in ((l10n.locale_from_params, self._args), - (l10n.locale_from_headers, headers)): - loc_str = func(mapping) - if loc_str: - if not raw: + for mapping, field in ((self._args, l10n.QUERY_PARAM), + (headers, 'accept-language')): + + loc_strs = get_choice_from_headers(mapping, field, all=True) + if loc_strs: + if raw is None: # This is the first-found locale string: set as raw - raw = loc_str + raw = get_from_headers(mapping, field) + # Check if locale string is a good match for the UI - loc = l10n.best_match(loc_str, supported_locales) - is_override = func is l10n.locale_from_params + loc = l10n.best_match(loc_strs, supported_locales) + is_override = field is l10n.QUERY_PARAM if loc != default_locale or is_override: return raw, loc @@ -335,17 +339,16 @@ def _get_format(self, headers) -> Union[str, None]: return format_ # Format not specified: get from Accept headers (MIME types) - # e.g. format_ = 'text/html' - h = headers.get('accept', headers.get('Accept', '')).strip() # noqa + # e.g. Accept: 'text/html;q=0.5,application/ld+json' + types_ = get_choice_from_headers(headers, 'accept', all=True) + if types_ is None: + return + (fmts, mimes) = zip(*FORMAT_TYPES.items()) - # basic support for complex types (i.e. with "q=0.x") - for type_ in (t.split(';')[0].strip() for t in h.split(',') if t): + for type_ in types_: if type_ in mimes: idx_ = mimes.index(type_) - format_ = fmts[idx_] - break - - return format_ or None + return fmts[idx_] @property def data(self) -> bytes: @@ -503,7 +506,7 @@ def get_response_headers(self, force_lang: l10n.Locale = None, if F_GZIP in FORMAT_TYPES: if force_encoding: headers['Content-Encoding'] = force_encoding - elif F_GZIP in self._headers.get('Accept-Encoding', ''): + elif F_GZIP in get_from_headers(self._headers, 'accept-encoding'): headers['Content-Encoding'] = F_GZIP return headers diff --git a/pygeoapi/l10n.py b/pygeoapi/l10n.py index 908888e45..de6fc86a6 100644 --- a/pygeoapi/l10n.py +++ b/pygeoapi/l10n.py @@ -113,7 +113,7 @@ def locale2str(value: Locale) -> str: def best_match(accept_languages, available_locales) -> Locale: """ - Takes an Accept-Languages string (from header or request query params) + Takes an Accept-Languages sorted list (from header or request query params) and finds the best matching locale from a list of available locales. This function provides a framework-independent alternative to the @@ -131,12 +131,12 @@ def best_match(accept_languages, available_locales) -> Locale: or unknown locale is ignored. However, if no `available_locales` are specified, a `LocaleError` is raised. - :param accept_languages: A Locale or string with one or more languages. + :param accept_languages: A Locale or list of one or more languages. This can be as simple as "de" for example, but it's also possible to include a territory (e.g. "en-US" or "fr_BE") or even a complex - string with quality values, e.g. - "fr-CH, fr;q=0.9, en;q=0.8, de;q=0.7, *;q=0.5". + list sorted by quality values, e.g. + ["fr-CH, "fr", "en", "de", "*"]. :param available_locales: A list containing the available locales. For example, a pygeoapi provider might only support ["de", "en"]. @@ -170,49 +170,12 @@ def get_match(locale_, available_locales_): if isinstance(accept_languages, Locale): # If a Babel Locale was used as input, transform back into a string - accept_languages = locale2str(accept_languages) + accept_languages = [locale2str(accept_languages)] - if not isinstance(accept_languages, str): + if not isinstance(accept_languages, list): # If `accept_languages` is not a string, ignore it LOGGER.debug(f"ignoring invalid accept-languages '{accept_languages}'") - accept_languages = '' - - tags = accept_languages.split(',') - num_tags = len(tags) - req_locales = {} - for i, lang in enumerate(tags): - q_raw = None - q_out = None - if not lang: - continue - - # Check if complex (i.e. with quality weights) - try: - lang, q_raw = (v.strip() for v in lang.split(';')) - except ValueError: - # Tuple unpacking failed: tag is not complex (or too complex :)) - pass - - # Validate locale tag - loc = str2locale(lang, True) - if not loc: - LOGGER.debug(f"ignoring invalid accept-language '{lang}'") - continue - - # Validate quality weight (e.g. "q=0.7") - if q_raw: - try: - q_out = float([v.strip() for v in q_raw.split('=')][1]) - except (ValueError, IndexError): - # Tuple unpacking failed: not a valid q tag - pass - - # If there's no actual q, set one based on the language order - if not q_out: - q_out = num_tags - i - - # Store locale - req_locales[q_out] = loc + accept_languages = [] # Process supported locales prv_locales = OrderedDict() @@ -221,7 +184,11 @@ def get_match(locale_, available_locales_): prv_locales.setdefault(loc.language, []).append(loc.territory) # Return best match from accepted languages - for _, loc in sorted(req_locales.items(), reverse=True): + for lang in accept_languages: + loc = str2locale(lang, True) + if not loc: + LOGGER.debug(f"ignoring invalid accept-language '{lang}'") + continue match = get_match(loc, prv_locales) if match: LOGGER.debug(f"'{match}' matches requested '{accept_languages}'") @@ -281,7 +248,7 @@ def translate(value, language: Union[Locale, str]): return value # Find best language match and return value by its key - out_locale = best_match(language, loc_items.keys()) + out_locale = best_match([language], loc_items.keys()) return value[loc_items[out_locale]] @@ -340,42 +307,6 @@ def _translate_dict(obj, level: int = 0): return result -def locale_from_headers(headers) -> str: - """ - Gets a valid Locale from a request headers dictionary. - Supported are complex strings (e.g. "fr-CH, fr;q=0.9, en;q=0.8"), - web locales (e.g. "en-US") or basic language tags (e.g. "en"). - A value of `None` is returned if the locale was not found or invalid. - - :param headers: Mapping of request headers. - - :returns: locale string or None - """ - - lang = {k.lower(): v for k, v in headers.items()}.get('accept-language') - if lang: - LOGGER.debug(f"Got locale '{lang}' from 'Accept-Language' header") - return lang - - -def locale_from_params(params) -> str: - """ - Gets a valid Locale from a request query parameters dictionary. - Supported are complex strings (e.g. "fr-CH, fr;q=0.9, en;q=0.8"), - web locales (e.g. "en-US") or basic language tags (e.g. "en"). - A value of `None` is returned if the locale was not found or invalid. - - :param params: Mapping of request query parameters. - - :returns: locale string or None - """ - - lang = params.get(QUERY_PARAM) - if lang: - LOGGER.debug(f"Got locale '{lang}' from query parameter '{QUERY_PARAM}'") # noqa - return lang - - def set_response_language(headers: dict, *locale_: Locale): """ Sets the Content-Language on the given HTTP response headers dict. diff --git a/pygeoapi/util.py b/pygeoapi/util.py index 93d67239f..ded9b6eb3 100644 --- a/pygeoapi/util.py +++ b/pygeoapi/util.py @@ -38,6 +38,7 @@ from datetime import date, datetime, time, timezone from decimal import Decimal from enum import Enum +from heapq import heappush import json import logging import mimetypes @@ -1054,3 +1055,58 @@ def _inplace_replace_geometry_filter_name( else: _inplace_replace_geometry_filter_name( sub_node, geometry_column_name) + + +def get_from_headers(headers: dict, header_name: str) -> str: + """ + Gets case insensitive value from dictionary. + This is particularly useful when trying to get + headers from Starlette and Flask without issue + + :param headers: `dict` of request headers. + :param header_name: Name of request header. + + :returns: `str` value of header + """ + + cleaned_headers = {k.strip().lower(): v for k, v in headers.items()} + return cleaned_headers.get(header_name.lower(), '') + + +def get_choice_from_headers(headers: dict, + header_name: str, + all: bool = False) -> Union[str, List[str]]: + """ + Gets choices from a request dictionary, + considering numerical ordering of preferences. + Supported are complex preference strings (e.g. "fr-CH, fr;q=0.9, en;q=0.8") + + :param headers: `dict` of request headers. + :param header_name: Name of request header. + :param all: bool to return one or all header values. + + :returns: Sorted choice or choices from header + """ + + # Select header of interest + header = get_from_headers(headers=headers, header_name=header_name) + if header == '': + return + + # Parse choices, extracting optional q values (defaults to 1.0) + choices = [] + for i, part in enumerate(header.split(',')): + match = re.match(r'^([^;]+)(?:;q=([\d.]+))?$', part.strip()) + if match: + value, q_value = match.groups() + q_value = float(q_value) if q_value else 1.0 + + # Sort choices by q value and index + if 0 <= q_value <= 1: + heappush(choices, (1 / q_value, i, value)) + + # Drop q value + sorted_choices = [choice[-1] for choice in choices] + + # Return one or all choices + return sorted_choices if all else sorted_choices[0] diff --git a/tests/test_util.py b/tests/test_util.py index a4a9e2dbe..b542a7b9d 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -549,3 +549,32 @@ def test_modify_pygeofilter( geometry_column_name=geometry_colum_name ) assert result == expected + + +def test_get_choice_from_headers(): + _headers = { + 'accept': 'text/html;q=0.5,application/ld+json', + 'accept-encoding': 'deflate;q=0.5,gzip' + } + + # Test various capitalizations + assert util.get_choice_from_headers(_headers, 'accept-language') is None + assert util.get_choice_from_headers( + {**_headers, 'accept-language': 'en;q=0.8,de;q=0.6,fr;q=0.4'}, + 'accept-language') == 'en' + assert util.get_choice_from_headers( + {**_headers, 'Accept-Language': 'en;q=0.8,de'}, + 'accept-language') == 'de' + assert util.get_choice_from_headers( + {**_headers, 'Accept-Language': 'en,de'}, 'accept-language') == 'en' + assert util.get_choice_from_headers( + {**_headers, 'ACCEPT-LANGUAGE': 'en;q=0.8,de;q=0.2,fr'}, + 'accept-language') == 'fr' + assert util.get_choice_from_headers( + {**_headers, 'accept-language': 'en_US'}, 'accept-language') == 'en_US' + + assert util.get_choice_from_headers(_headers, 'accept-encoding') == 'gzip' + assert util.get_choice_from_headers(_headers, + 'accept') == 'application/ld+json' + assert util.get_choice_from_headers( + {'accept-language': 'en_US', 'accept': '*/*'}, 'accept') == '*/*'