Skip to content

Add feed_range in query_items API #41722

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.13.0b3 (Unreleased)

#### Features Added
* Added feed range support in `query_items`. See [PR 41722](https://github.com/Azure/azure-sdk-for-python/pull/41722).

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from azure.core.pipeline.transport._base import HttpRequest

from . import http_constants
from .partition_key import _Empty, _Undefined
from .partition_key import _Empty, _Undefined, PartitionKeyKind


# pylint: disable=protected-access
Expand Down Expand Up @@ -88,7 +88,7 @@ def should_extract_partition_key(self, container_cache: Optional[Dict[str, Any]]
if self._headers and http_constants.HttpHeaders.PartitionKey in self._headers:
current_partition_key = self._headers[http_constants.HttpHeaders.PartitionKey]
partition_key_definition = container_cache["partitionKey"] if container_cache else None
if partition_key_definition and partition_key_definition["kind"] == "MultiHash":
if partition_key_definition and partition_key_definition["kind"] == PartitionKeyKind.MULTI_HASH:
# A null in the multihash partition key indicates a failure in extracting partition keys
# from the document definition
return 'null' in current_partition_key
Expand All @@ -110,7 +110,7 @@ def _extract_partition_key(self, client: Optional[Any], container_cache: Optiona
elif options and isinstance(options["partitionKey"], _Empty):
new_partition_key = []
# else serialize using json dumps method which apart from regular values will serialize None into null
elif partition_key_definition and partition_key_definition["kind"] == "MultiHash":
elif partition_key_definition and partition_key_definition["kind"] == PartitionKeyKind.MULTI_HASH:
new_partition_key = json.dumps(options["partitionKey"], separators=(',', ':'))
else:
new_partition_key = json.dumps([options["partitionKey"]])
Expand All @@ -131,7 +131,7 @@ async def _extract_partition_key_async(self, client: Optional[Any],
elif isinstance(options["partitionKey"], _Empty):
new_partition_key = []
# else serialize using json dumps method which apart from regular values will serialize None into null
elif partition_key_definition and partition_key_definition["kind"] == "MultiHash":
elif partition_key_definition and partition_key_definition["kind"] == PartitionKeyKind.MULTI_HASH:
new_partition_key = json.dumps(options["partitionKey"], separators=(',', ':'))
else:
new_partition_key = json.dumps([options["partitionKey"]])
Expand Down
47 changes: 21 additions & 26 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import os
import urllib.parse
import uuid
from typing import Callable, Dict, Any, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, cast, Type
from typing import Callable, Dict, Any, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, cast
from typing_extensions import TypedDict
from urllib3.util.retry import Retry

Expand Down Expand Up @@ -60,6 +60,7 @@
from ._base import _build_properties_cache
from ._change_feed.change_feed_iterable import ChangeFeedIterable
from ._change_feed.change_feed_state import ChangeFeedState
from ._change_feed.feed_range_internal import FeedRangeInternalEpk
from ._constants import _Constants as Constants
from ._cosmos_http_logging_policy import CosmosHttpLoggingPolicy
from ._cosmos_responses import CosmosDict, CosmosList
Expand All @@ -71,15 +72,12 @@
from .partition_key import (
_Undefined,
_Empty,
PartitionKey,
PartitionKeyKind,
PartitionKeyType,
SequentialPartitionKeyType,
_return_undefined_or_empty_partition_key,
NonePartitionKeyValue,
_get_partition_key_from_partition_key_definition
)

PartitionKeyType = Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]], Type[NonePartitionKeyValue]] # pylint: disable=line-too-long


class CredentialDict(TypedDict, total=False):
masterKey: str
resourceTokens: Mapping[str, Any]
Expand Down Expand Up @@ -3160,23 +3158,20 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
request_params = RequestObject(resource_type, documents._OperationType.SqlQuery, req_headers)
request_params.set_excluded_location_from_options(options)

# check if query has prefix partition key
isPrefixPartitionQuery = kwargs.pop("isPrefixPartitionQuery", None)
if isPrefixPartitionQuery and "partitionKeyDefinition" in kwargs:
# Check if the over lapping ranges can be populated
feed_range_epk = None
if "feed_range" in kwargs:
feed_range = kwargs.pop("feed_range")
feed_range_epk = FeedRangeInternalEpk.from_json(feed_range).get_normalized_range()
elif "prefix_partition_key_object" in kwargs and "prefix_partition_key_value" in kwargs:
prefix_partition_key_obj = kwargs.pop("prefix_partition_key_object")
prefix_partition_key_value: SequentialPartitionKeyType = kwargs.pop("prefix_partition_key_value")
feed_range_epk = prefix_partition_key_obj.get_epk_range_for_prefix_partition_key(prefix_partition_key_value)

# If feed_range_epk exist, query with the range
if feed_range_epk is not None:
last_response_headers = CaseInsensitiveDict()
# here get the over lapping ranges
# Default to empty Dictionary, but unlikely to be empty as we first check if we have it in kwargs
pk_properties: Union[PartitionKey, Dict] = kwargs.pop("partitionKeyDefinition", {})
partition_key_definition = _get_partition_key_from_partition_key_definition(pk_properties)
partition_key_value: Sequence[
Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] = cast(
Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]],
pk_properties.get("partition_key")
)
feedrangeEPK = partition_key_definition._get_epk_range_for_prefix_partition_key(
partition_key_value
) # cspell:disable-line
over_lapping_ranges = self._routing_map_provider.get_overlapping_ranges(resource_id, [feedrangeEPK],
over_lapping_ranges = self._routing_map_provider.get_overlapping_ranges(resource_id, [feed_range_epk],
options)
# It is possible to get more than one over lapping range. We need to get the query results for each one
results: Dict[str, Any] = {}
Expand All @@ -3193,8 +3188,8 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
single_range = routing_range.Range.PartitionKeyRangeToRange(over_lapping_range)
# Since the range min and max are all Upper Cased string Hex Values,
# we can compare the values lexicographically
EPK_sub_range = routing_range.Range(range_min=max(single_range.min, feedrangeEPK.min),
range_max=min(single_range.max, feedrangeEPK.max),
EPK_sub_range = routing_range.Range(range_min=max(single_range.min, feed_range_epk.min),
range_max=min(single_range.max, feed_range_epk.max),
isMinInclusive=True, isMaxInclusive=False)
if single_range.min == EPK_sub_range.min and EPK_sub_range.max == single_range.max:
# The Epk Sub Range spans exactly one physical partition
Expand Down Expand Up @@ -3339,7 +3334,7 @@ def _ExtractPartitionKey(
partitionKeyDefinition: Mapping[str, Any],
document: Mapping[str, Any]
) -> Union[List[Optional[Union[str, float, bool]]], str, float, bool, _Empty, _Undefined]:
if partitionKeyDefinition["kind"] == "MultiHash":
if partitionKeyDefinition["kind"] == PartitionKeyKind.MULTI_HASH:
ret: List[Optional[Union[str, float, bool]]] = []
for partition_key_level in partitionKeyDefinition["paths"]:
# Parses the paths into a list of token each representing a property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"""
from typing import TYPE_CHECKING, Optional

from azure.cosmos.partition_key import _get_partition_key_from_partition_key_definition
from azure.cosmos.partition_key import get_partition_key_from_definition
from azure.cosmos._global_partition_endpoint_manager_circuit_breaker_core import \
_GlobalPartitionEndpointManagerForCircuitBreakerCore

Expand Down Expand Up @@ -62,12 +62,12 @@ def create_pk_range_wrapper(self, request: RequestObject) -> Optional[PartitionK
# get relevant information from container cache to get the overlapping ranges
container_link = properties["container_link"]
partition_key_definition = properties["partitionKey"]
partition_key = _get_partition_key_from_partition_key_definition(partition_key_definition)
partition_key = get_partition_key_from_definition(partition_key_definition)

if HttpHeaders.PartitionKey in request.headers:
partition_key_value = request.headers[HttpHeaders.PartitionKey]
# get the partition key range for the given partition key
epk_range = [partition_key._get_epk_range_for_partition_key(partition_key_value)] # pylint: disable=protected-access
epk_range = [partition_key.get_epk_range_for_partition_key(partition_key_value)] # pylint: disable=protected-access
partition_ranges = (self.client._routing_map_provider # pylint: disable=protected-access
.get_overlapping_ranges(container_link, epk_range))
partition_range = Range.PartitionKeyRangeToRange(partition_ranges[0])
Expand Down
66 changes: 61 additions & 5 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@
import base64
import json
import time
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional, Tuple

from ._version import VERSION


def get_user_agent(suffix: Optional[str]) -> str:
def get_user_agent(suffix: Optional[str] = None) -> str:
os_name = safe_user_agent_header(platform.platform())
python_version = safe_user_agent_header(platform.python_version())
user_agent = "azsdk-python-cosmos/{} Python/{} ({})".format(VERSION, python_version, os_name)
if suffix:
user_agent += f" {suffix}"
return user_agent

def get_user_agent_async(suffix: Optional[str]) -> str:
def get_user_agent_async(suffix: Optional[str] = None) -> str:
os_name = safe_user_agent_header(platform.platform())
python_version = safe_user_agent_header(platform.python_version())
user_agent = "azsdk-python-cosmos-async/{} Python/{} ({})".format(VERSION, python_version, os_name)
Expand All @@ -49,7 +49,7 @@ def get_user_agent_async(suffix: Optional[str]) -> str:
return user_agent


def safe_user_agent_header(s: Optional[str]) -> str:
def safe_user_agent_header(s: Optional[str] = None) -> str:
if s is None:
s = "unknown"
# remove all white spaces
Expand All @@ -59,7 +59,7 @@ def safe_user_agent_header(s: Optional[str]) -> str:
return s


def get_index_metrics_info(delimited_string: Optional[str]) -> Dict[str, Any]:
def get_index_metrics_info(delimited_string: Optional[str] = None) -> Dict[str, Any]:
if delimited_string is None:
return {}
try:
Expand All @@ -76,3 +76,59 @@ def get_index_metrics_info(delimited_string: Optional[str]) -> Dict[str, Any]:

def current_time_millis() -> int:
return int(round(time.time() * 1000))

def add_args_to_kwargs(
arg_names: List[str],
args: Tuple[Any, ...],
kwargs: Dict[str, Any]
) -> None:
"""Add positional arguments(args) to keyword argument dictionary(kwargs) using names in arg_names as keys.
To be backward-compatible, some expected positional arguments has to be allowed. This method will verify number of
maximum positional arguments and add them to the keyword argument dictionary(kwargs)

:param List[str] arg_names: The names of positional arguments.
:param Tuple[Any, ...] args: The tuple of positional arguments.
:param Dict[str, Any] kwargs: The dictionary of keyword arguments as reference. This dictionary will be updated.
"""

if len(args) > len(arg_names):
raise ValueError(f"Positional argument is out of range. Expected {len(arg_names)} arguments, "
f"but got {len(args)} instead. Please review argument list in API documentation.")

for name, arg in zip(arg_names, args):
if name in kwargs:
raise ValueError(f"{name} cannot be used as positional and keyword argument at the same time.")
kwargs[name] = arg


def format_list_with_and(items: List[str]) -> str:
"""Format a list of items into a string with commas and 'and' for the last item.

:param List[str] items: The list of items to format.
:return: A formatted string with items separated by commas and 'and' before the last item.
:rtype: str
"""
formatted_items = ""
quoted = [f"'{item}'" for item in items]
if len(quoted) > 2:
formatted_items = ", ".join(quoted[:-1]) + ", and " + quoted[-1]
elif len(quoted) == 2:
formatted_items = " and ".join(quoted)
elif quoted:
formatted_items = quoted[0]
return formatted_items

def verify_exclusive_arguments(
exclusive_keys: List[str],
**kwargs: Dict[str, Any]) -> None:
"""Verify if exclusive arguments are present in kwargs.
For some Cosmos SDK APIs, some arguments are exclusive, or cannot be used at the same time. This method will verify
that and raise an error if exclusive arguments are present.

:param List[str] exclusive_keys: The names of exclusive arguments.
"""
keys_in_kwargs = [key for key in exclusive_keys if key in kwargs and kwargs[key] is not None]

if len(keys_in_kwargs) > 1:
raise ValueError(f"{format_list_with_and(keys_in_kwargs)} are exclusive parameters, "
f"please only set one of them.")
Loading