Skip to content

Implement q priority of request headers #1952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions pygeoapi/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
CrsTransformSpec, TEMPLATES, UrlPrefetcher, dategetter,
filter_dict_by_key_value, filter_providers_by_type, get_api_rules,
get_base_url, get_provider_by_type, get_provider_default, get_typed_value,
get_crs_from_uri, get_supported_crs_list, render_j2_template, to_json
get_crs_from_uri, get_supported_crs_list, render_j2_template, to_json,
get_choice_from_headers, get_from_headers
)

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -150,7 +151,8 @@ def apply_gzip(headers: dict, content: Union[str, bytes]) -> Union[str, bytes]:
Compress content if requested in header.
"""
charset = CHARSET[0]
if F_GZIP in headers.get('Content-Encoding', []):

if F_GZIP in get_from_headers(headers, 'content-encoding'):
try:
if isinstance(content, bytes):
# bytes means Content-Type needs to be set upstream
Expand Down Expand Up @@ -305,16 +307,18 @@ def _get_locale(self, headers, supported_locales):
raise ValueError(f"{self.__class__.__name__} must be initialized"
f"with a list of valid supported locales")

for func, mapping in ((l10n.locale_from_params, self._args),
(l10n.locale_from_headers, headers)):
loc_str = func(mapping)
if loc_str:
if not raw:
for mapping, field in ((self._args, l10n.QUERY_PARAM),
(headers, 'accept-language')):

loc_strs = get_choice_from_headers(mapping, field, all=True)
if loc_strs:
if raw is None:
# This is the first-found locale string: set as raw
raw = loc_str
raw = get_from_headers(mapping, field)

# Check if locale string is a good match for the UI
loc = l10n.best_match(loc_str, supported_locales)
is_override = func is l10n.locale_from_params
loc = l10n.best_match(loc_strs, supported_locales)
is_override = field is l10n.QUERY_PARAM
if loc != default_locale or is_override:
return raw, loc

Expand All @@ -335,17 +339,16 @@ def _get_format(self, headers) -> Union[str, None]:
return format_

# Format not specified: get from Accept headers (MIME types)
# e.g. format_ = 'text/html'
h = headers.get('accept', headers.get('Accept', '')).strip() # noqa
# e.g. Accept: 'text/html;q=0.5,application/ld+json'
types_ = get_choice_from_headers(headers, 'accept', all=True)
if types_ is None:
return

(fmts, mimes) = zip(*FORMAT_TYPES.items())
# basic support for complex types (i.e. with "q=0.x")
for type_ in (t.split(';')[0].strip() for t in h.split(',') if t):
for type_ in types_:
if type_ in mimes:
idx_ = mimes.index(type_)
format_ = fmts[idx_]
break

return format_ or None
return fmts[idx_]

@property
def data(self) -> bytes:
Expand Down Expand Up @@ -503,7 +506,7 @@ def get_response_headers(self, force_lang: l10n.Locale = None,
if F_GZIP in FORMAT_TYPES:
if force_encoding:
headers['Content-Encoding'] = force_encoding
elif F_GZIP in self._headers.get('Accept-Encoding', ''):
elif F_GZIP in get_from_headers(self._headers, 'accept-encoding'):
headers['Content-Encoding'] = F_GZIP

return headers
Expand Down
95 changes: 13 additions & 82 deletions pygeoapi/l10n.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def locale2str(value: Locale) -> str:

def best_match(accept_languages, available_locales) -> Locale:
"""
Takes an Accept-Languages string (from header or request query params)
Takes an Accept-Languages sorted list (from header or request query params)
and finds the best matching locale from a list of available locales.

This function provides a framework-independent alternative to the
Expand All @@ -131,12 +131,12 @@ def best_match(accept_languages, available_locales) -> Locale:
or unknown locale is ignored. However, if no
`available_locales` are specified, a `LocaleError` is raised.

:param accept_languages: A Locale or string with one or more languages.
:param accept_languages: A Locale or list of one or more languages.
This can be as simple as "de" for example,
but it's also possible to include a territory
(e.g. "en-US" or "fr_BE") or even a complex
string with quality values, e.g.
"fr-CH, fr;q=0.9, en;q=0.8, de;q=0.7, *;q=0.5".
list sorted by quality values, e.g.
["fr-CH, "fr", "en", "de", "*"].
:param available_locales: A list containing the available locales.
For example, a pygeoapi provider might only
support ["de", "en"].
Expand Down Expand Up @@ -170,49 +170,12 @@ def get_match(locale_, available_locales_):

if isinstance(accept_languages, Locale):
# If a Babel Locale was used as input, transform back into a string
accept_languages = locale2str(accept_languages)
accept_languages = [locale2str(accept_languages)]

if not isinstance(accept_languages, str):
if not isinstance(accept_languages, list):
# If `accept_languages` is not a string, ignore it
LOGGER.debug(f"ignoring invalid accept-languages '{accept_languages}'")
accept_languages = ''

tags = accept_languages.split(',')
num_tags = len(tags)
req_locales = {}
for i, lang in enumerate(tags):
q_raw = None
q_out = None
if not lang:
continue

# Check if complex (i.e. with quality weights)
try:
lang, q_raw = (v.strip() for v in lang.split(';'))
except ValueError:
# Tuple unpacking failed: tag is not complex (or too complex :))
pass

# Validate locale tag
loc = str2locale(lang, True)
if not loc:
LOGGER.debug(f"ignoring invalid accept-language '{lang}'")
continue

# Validate quality weight (e.g. "q=0.7")
if q_raw:
try:
q_out = float([v.strip() for v in q_raw.split('=')][1])
except (ValueError, IndexError):
# Tuple unpacking failed: not a valid q tag
pass

# If there's no actual q, set one based on the language order
if not q_out:
q_out = num_tags - i

# Store locale
req_locales[q_out] = loc
accept_languages = []

# Process supported locales
prv_locales = OrderedDict()
Expand All @@ -221,7 +184,11 @@ def get_match(locale_, available_locales_):
prv_locales.setdefault(loc.language, []).append(loc.territory)

# Return best match from accepted languages
for _, loc in sorted(req_locales.items(), reverse=True):
for lang in accept_languages:
loc = str2locale(lang, True)
if not loc:
LOGGER.debug(f"ignoring invalid accept-language '{lang}'")
continue
match = get_match(loc, prv_locales)
if match:
LOGGER.debug(f"'{match}' matches requested '{accept_languages}'")
Expand Down Expand Up @@ -281,7 +248,7 @@ def translate(value, language: Union[Locale, str]):
return value

# Find best language match and return value by its key
out_locale = best_match(language, loc_items.keys())
out_locale = best_match([language], loc_items.keys())
return value[loc_items[out_locale]]


Expand Down Expand Up @@ -340,42 +307,6 @@ def _translate_dict(obj, level: int = 0):
return result


def locale_from_headers(headers) -> str:
"""
Gets a valid Locale from a request headers dictionary.
Supported are complex strings (e.g. "fr-CH, fr;q=0.9, en;q=0.8"),
web locales (e.g. "en-US") or basic language tags (e.g. "en").
A value of `None` is returned if the locale was not found or invalid.

:param headers: Mapping of request headers.

:returns: locale string or None
"""

lang = {k.lower(): v for k, v in headers.items()}.get('accept-language')
if lang:
LOGGER.debug(f"Got locale '{lang}' from 'Accept-Language' header")
return lang


def locale_from_params(params) -> str:
"""
Gets a valid Locale from a request query parameters dictionary.
Supported are complex strings (e.g. "fr-CH, fr;q=0.9, en;q=0.8"),
web locales (e.g. "en-US") or basic language tags (e.g. "en").
A value of `None` is returned if the locale was not found or invalid.

:param params: Mapping of request query parameters.

:returns: locale string or None
"""

lang = params.get(QUERY_PARAM)
if lang:
LOGGER.debug(f"Got locale '{lang}' from query parameter '{QUERY_PARAM}'") # noqa
return lang


def set_response_language(headers: dict, *locale_: Locale):
"""
Sets the Content-Language on the given HTTP response headers dict.
Expand Down
56 changes: 56 additions & 0 deletions pygeoapi/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from datetime import date, datetime, time, timezone
from decimal import Decimal
from enum import Enum
from heapq import heappush
import json
import logging
import mimetypes
Expand Down Expand Up @@ -1054,3 +1055,58 @@ def _inplace_replace_geometry_filter_name(
else:
_inplace_replace_geometry_filter_name(
sub_node, geometry_column_name)


def get_from_headers(headers: dict, header_name: str) -> str:
"""
Gets case insensitive value from dictionary.
This is particularly useful when trying to get
headers from Starlette and Flask without issue

:param headers: `dict` of request headers.
:param header_name: Name of request header.

:returns: `str` value of header
"""

cleaned_headers = {k.strip().lower(): v for k, v in headers.items()}
return cleaned_headers.get(header_name.lower(), '')


def get_choice_from_headers(headers: dict,
header_name: str,
all: bool = False) -> Union[str, List[str]]:
"""
Gets choices from a request dictionary,
considering numerical ordering of preferences.
Supported are complex preference strings (e.g. "fr-CH, fr;q=0.9, en;q=0.8")

:param headers: `dict` of request headers.
:param header_name: Name of request header.
:param all: bool to return one or all header values.

:returns: Sorted choice or choices from header
"""

# Select header of interest
header = get_from_headers(headers=headers, header_name=header_name)
if header == '':
return

# Parse choices, extracting optional q values (defaults to 1.0)
choices = []
for i, part in enumerate(header.split(',')):
match = re.match(r'^([^;]+)(?:;q=([\d.]+))?$', part.strip())
if match:
value, q_value = match.groups()
q_value = float(q_value) if q_value else 1.0

# Sort choices by q value and index
if 0 <= q_value <= 1:
heappush(choices, (1 / q_value, i, value))

# Drop q value
sorted_choices = [choice[-1] for choice in choices]

# Return one or all choices
return sorted_choices if all else sorted_choices[0]
29 changes: 29 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,32 @@ def test_modify_pygeofilter(
geometry_column_name=geometry_colum_name
)
assert result == expected


def test_get_choice_from_headers():
_headers = {
'accept': 'text/html;q=0.5,application/ld+json',
'accept-encoding': 'deflate;q=0.5,gzip'
}

# Test various capitalizations
assert util.get_choice_from_headers(_headers, 'accept-language') is None
assert util.get_choice_from_headers(
{**_headers, 'accept-language': 'en;q=0.8,de;q=0.6,fr;q=0.4'},
'accept-language') == 'en'
assert util.get_choice_from_headers(
{**_headers, 'Accept-Language': 'en;q=0.8,de'},
'accept-language') == 'de'
assert util.get_choice_from_headers(
{**_headers, 'Accept-Language': 'en,de'}, 'accept-language') == 'en'
assert util.get_choice_from_headers(
{**_headers, 'ACCEPT-LANGUAGE': 'en;q=0.8,de;q=0.2,fr'},
'accept-language') == 'fr'
assert util.get_choice_from_headers(
{**_headers, 'accept-language': 'en_US'}, 'accept-language') == 'en_US'

assert util.get_choice_from_headers(_headers, 'accept-encoding') == 'gzip'
assert util.get_choice_from_headers(_headers,
'accept') == 'application/ld+json'
assert util.get_choice_from_headers(
{'accept-language': 'en_US', 'accept': '*/*'}, 'accept') == '*/*'