diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 1949488630a3..2328fb11e6e3 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.13.0b3 (Unreleased) #### Features Added +* Added feed range support in `query_items`. See [PR 41722](https://github.com/Azure/azure-sdk-for-python/pull/41722). #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_container_recreate_retry_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_container_recreate_retry_policy.py index 53ee57b8c3f8..e36adfa5f323 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_container_recreate_retry_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_container_recreate_retry_policy.py @@ -28,7 +28,7 @@ from azure.core.pipeline.transport._base import HttpRequest from . import http_constants -from .partition_key import _Empty, _Undefined +from .partition_key import _Empty, _Undefined, _PartitionKeyKind # pylint: disable=protected-access @@ -88,7 +88,7 @@ def should_extract_partition_key(self, container_cache: Optional[Dict[str, Any]] if self._headers and http_constants.HttpHeaders.PartitionKey in self._headers: current_partition_key = self._headers[http_constants.HttpHeaders.PartitionKey] partition_key_definition = container_cache["partitionKey"] if container_cache else None - if partition_key_definition and partition_key_definition["kind"] == "MultiHash": + if partition_key_definition and partition_key_definition["kind"] == _PartitionKeyKind.MULTI_HASH: # A null in the multihash partition key indicates a failure in extracting partition keys # from the document definition return 'null' in current_partition_key @@ -110,7 +110,7 @@ def _extract_partition_key(self, client: Optional[Any], container_cache: Optiona elif options and isinstance(options["partitionKey"], _Empty): new_partition_key = [] # else serialize using json dumps method which apart from regular values will serialize None into null - elif partition_key_definition and partition_key_definition["kind"] == "MultiHash": + elif partition_key_definition and partition_key_definition["kind"] == _PartitionKeyKind.MULTI_HASH: new_partition_key = json.dumps(options["partitionKey"], separators=(',', ':')) else: new_partition_key = json.dumps([options["partitionKey"]]) @@ -131,7 +131,7 @@ async def _extract_partition_key_async(self, client: Optional[Any], elif isinstance(options["partitionKey"], _Empty): new_partition_key = [] # else serialize using json dumps method which apart from regular values will serialize None into null - elif partition_key_definition and partition_key_definition["kind"] == "MultiHash": + elif partition_key_definition and partition_key_definition["kind"] == _PartitionKeyKind.MULTI_HASH: new_partition_key = json.dumps(options["partitionKey"], separators=(',', ':')) else: new_partition_key = json.dumps([options["partitionKey"]]) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index a39433984af3..69ada1d563d6 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -26,7 +26,7 @@ import os import urllib.parse import uuid -from typing import Callable, Dict, Any, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, cast, Type +from typing import Callable, Dict, Any, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, cast from typing_extensions import TypedDict from urllib3.util.retry import Retry @@ -60,6 +60,7 @@ from ._base import _build_properties_cache from ._change_feed.change_feed_iterable import ChangeFeedIterable from ._change_feed.change_feed_state import ChangeFeedState +from ._change_feed.feed_range_internal import FeedRangeInternalEpk from ._constants import _Constants as Constants from ._cosmos_http_logging_policy import CosmosHttpLoggingPolicy from ._cosmos_responses import CosmosDict, CosmosList @@ -71,15 +72,12 @@ from .partition_key import ( _Undefined, _Empty, - PartitionKey, + _PartitionKeyKind, + _PartitionKeyType, + _SequentialPartitionKeyType, _return_undefined_or_empty_partition_key, - NonePartitionKeyValue, - _get_partition_key_from_partition_key_definition ) -PartitionKeyType = Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]], Type[NonePartitionKeyValue]] # pylint: disable=line-too-long - - class CredentialDict(TypedDict, total=False): masterKey: str resourceTokens: Mapping[str, Any] @@ -1062,7 +1060,7 @@ def QueryItems( database_or_container_link: str, query: Optional[Union[str, Dict[str, Any]]], options: Optional[Mapping[str, Any]] = None, - partition_key: Optional[PartitionKeyType] = None, + partition_key: Optional[_PartitionKeyType] = None, response_hook: Optional[Callable[[Mapping[str, Any], Dict[str, Any]], None]] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: @@ -3160,23 +3158,21 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]: request_params = RequestObject(resource_type, documents._OperationType.SqlQuery, req_headers) request_params.set_excluded_location_from_options(options) - # check if query has prefix partition key - isPrefixPartitionQuery = kwargs.pop("isPrefixPartitionQuery", None) - if isPrefixPartitionQuery and "partitionKeyDefinition" in kwargs: + # Check if the over lapping ranges can be populated + feed_range_epk = None + if "feed_range" in kwargs: + feed_range = kwargs.pop("feed_range") + feed_range_epk = FeedRangeInternalEpk.from_json(feed_range).get_normalized_range() + elif "prefix_partition_key_object" in kwargs and "prefix_partition_key_value" in kwargs: + prefix_partition_key_obj = kwargs.pop("prefix_partition_key_object") + prefix_partition_key_value: _SequentialPartitionKeyType = kwargs.pop("prefix_partition_key_value") + feed_range_epk = ( + prefix_partition_key_obj._get_epk_range_for_prefix_partition_key(prefix_partition_key_value)) + + # If feed_range_epk exist, query with the range + if feed_range_epk is not None: last_response_headers = CaseInsensitiveDict() - # here get the over lapping ranges - # Default to empty Dictionary, but unlikely to be empty as we first check if we have it in kwargs - pk_properties: Union[PartitionKey, Dict] = kwargs.pop("partitionKeyDefinition", {}) - partition_key_definition = _get_partition_key_from_partition_key_definition(pk_properties) - partition_key_value: Sequence[ - Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] = cast( - Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]], - pk_properties.get("partition_key") - ) - feedrangeEPK = partition_key_definition._get_epk_range_for_prefix_partition_key( - partition_key_value - ) # cspell:disable-line - over_lapping_ranges = self._routing_map_provider.get_overlapping_ranges(resource_id, [feedrangeEPK], + over_lapping_ranges = self._routing_map_provider.get_overlapping_ranges(resource_id, [feed_range_epk], options) # It is possible to get more than one over lapping range. We need to get the query results for each one results: Dict[str, Any] = {} @@ -3193,8 +3189,8 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]: single_range = routing_range.Range.PartitionKeyRangeToRange(over_lapping_range) # Since the range min and max are all Upper Cased string Hex Values, # we can compare the values lexicographically - EPK_sub_range = routing_range.Range(range_min=max(single_range.min, feedrangeEPK.min), - range_max=min(single_range.max, feedrangeEPK.max), + EPK_sub_range = routing_range.Range(range_min=max(single_range.min, feed_range_epk.min), + range_max=min(single_range.max, feed_range_epk.max), isMinInclusive=True, isMaxInclusive=False) if single_range.min == EPK_sub_range.min and EPK_sub_range.max == single_range.max: # The Epk Sub Range spans exactly one physical partition @@ -3339,7 +3335,7 @@ def _ExtractPartitionKey( partitionKeyDefinition: Mapping[str, Any], document: Mapping[str, Any] ) -> Union[List[Optional[Union[str, float, bool]]], str, float, bool, _Empty, _Undefined]: - if partitionKeyDefinition["kind"] == "MultiHash": + if partitionKeyDefinition["kind"] == _PartitionKeyKind.MULTI_HASH: ret: List[Optional[Union[str, float, bool]]] = [] for partition_key_level in partitionKeyDefinition["paths"]: # Parses the paths into a list of token each representing a property diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker.py index fbe8a56d8477..87df417b4f93 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker.py @@ -23,7 +23,7 @@ """ from typing import TYPE_CHECKING, Optional -from azure.cosmos.partition_key import _get_partition_key_from_partition_key_definition +from azure.cosmos.partition_key import get_partition_key_from_definition from azure.cosmos._global_partition_endpoint_manager_circuit_breaker_core import \ _GlobalPartitionEndpointManagerForCircuitBreakerCore @@ -62,7 +62,7 @@ def create_pk_range_wrapper(self, request: RequestObject) -> Optional[PartitionK # get relevant information from container cache to get the overlapping ranges container_link = properties["container_link"] partition_key_definition = properties["partitionKey"] - partition_key = _get_partition_key_from_partition_key_definition(partition_key_definition) + partition_key = get_partition_key_from_definition(partition_key_definition) if HttpHeaders.PartitionKey in request.headers: partition_key_value = request.headers[HttpHeaders.PartitionKey] diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_utils.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_utils.py index f1899415af87..9aa4209422cc 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_utils.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_utils.py @@ -27,12 +27,12 @@ import base64 import json import time -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional, Tuple from ._version import VERSION -def get_user_agent(suffix: Optional[str]) -> str: +def get_user_agent(suffix: Optional[str] = None) -> str: os_name = safe_user_agent_header(platform.platform()) python_version = safe_user_agent_header(platform.python_version()) user_agent = "azsdk-python-cosmos/{} Python/{} ({})".format(VERSION, python_version, os_name) @@ -40,7 +40,7 @@ def get_user_agent(suffix: Optional[str]) -> str: user_agent += f" {suffix}" return user_agent -def get_user_agent_async(suffix: Optional[str]) -> str: +def get_user_agent_async(suffix: Optional[str] = None) -> str: os_name = safe_user_agent_header(platform.platform()) python_version = safe_user_agent_header(platform.python_version()) user_agent = "azsdk-python-cosmos-async/{} Python/{} ({})".format(VERSION, python_version, os_name) @@ -49,7 +49,7 @@ def get_user_agent_async(suffix: Optional[str]) -> str: return user_agent -def safe_user_agent_header(s: Optional[str]) -> str: +def safe_user_agent_header(s: Optional[str] = None) -> str: if s is None: s = "unknown" # remove all white spaces @@ -59,7 +59,7 @@ def safe_user_agent_header(s: Optional[str]) -> str: return s -def get_index_metrics_info(delimited_string: Optional[str]) -> Dict[str, Any]: +def get_index_metrics_info(delimited_string: Optional[str] = None) -> Dict[str, Any]: if delimited_string is None: return {} try: @@ -76,3 +76,59 @@ def get_index_metrics_info(delimited_string: Optional[str]) -> Dict[str, Any]: def current_time_millis() -> int: return int(round(time.time() * 1000)) + +def add_args_to_kwargs( + arg_names: List[str], + args: Tuple[Any, ...], + kwargs: Dict[str, Any] + ) -> None: + """Add positional arguments(args) to keyword argument dictionary(kwargs) using names in arg_names as keys. + To be backward-compatible, some expected positional arguments has to be allowed. This method will verify number of + maximum positional arguments and add them to the keyword argument dictionary(kwargs) + + :param List[str] arg_names: The names of positional arguments. + :param Tuple[Any, ...] args: The tuple of positional arguments. + :param Dict[str, Any] kwargs: The dictionary of keyword arguments as reference. This dictionary will be updated. + """ + + if len(args) > len(arg_names): + raise ValueError(f"Positional argument is out of range. Expected {len(arg_names)} arguments, " + f"but got {len(args)} instead. Please review argument list in API documentation.") + + for name, arg in zip(arg_names, args): + if name in kwargs: + raise ValueError(f"{name} cannot be used as positional and keyword argument at the same time.") + kwargs[name] = arg + + +def format_list_with_and(items: List[str]) -> str: + """Format a list of items into a string with commas and 'and' for the last item. + + :param List[str] items: The list of items to format. + :return: A formatted string with items separated by commas and 'and' before the last item. + :rtype: str + """ + formatted_items = "" + quoted = [f"'{item}'" for item in items] + if len(quoted) > 2: + formatted_items = ", ".join(quoted[:-1]) + ", and " + quoted[-1] + elif len(quoted) == 2: + formatted_items = " and ".join(quoted) + elif quoted: + formatted_items = quoted[0] + return formatted_items + +def verify_exclusive_arguments( + exclusive_keys: List[str], + **kwargs: Dict[str, Any]) -> None: + """Verify if exclusive arguments are present in kwargs. + For some Cosmos SDK APIs, some arguments are exclusive, or cannot be used at the same time. This method will verify + that and raise an error if exclusive arguments are present. + + :param List[str] exclusive_keys: The names of exclusive arguments. + """ + keys_in_kwargs = [key for key in exclusive_keys if key in kwargs and kwargs[key] is not None] + + if len(keys_in_kwargs) > 1: + raise ValueError(f"{format_list_with_and(keys_in_kwargs)} are exclusive parameters, " + f"please only set one of them.") diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index 3cac81623f00..762729b883e1 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -34,6 +34,7 @@ from ._cosmos_client_connection_async import CosmosClientConnection from ._scripts import ScriptsProxy +from .. import _utils as utils from .._base import ( build_options as _build_options, validate_cache_staleness_value, @@ -52,7 +53,7 @@ _return_undefined_or_empty_partition_key, _Empty, _Undefined, - _get_partition_key_from_partition_key_definition + get_partition_key_from_definition ) __all__ = ("ContainerProxy",) @@ -154,7 +155,7 @@ async def _get_epk_range_for_partition_key( feed_options: Optional[Dict[str, Any]] = None) -> Range: container_properties = await self._get_properties_with_options(feed_options) partition_key_definition = container_properties["partitionKey"] - partition_key = _get_partition_key_from_partition_key_definition(partition_key_definition) + partition_key = get_partition_key_from_definition(partition_key_definition) return partition_key._get_epk_range_for_partition_key(partition_key_value) @@ -423,26 +424,26 @@ def read_all_items( ) return items - @distributed_trace + @overload def query_items( - self, - query: str, - *, - parameters: Optional[List[Dict[str, object]]] = None, - partition_key: Optional[PartitionKeyType] = None, - max_item_count: Optional[int] = None, - enable_scan_in_query: Optional[bool] = None, - populate_query_metrics: Optional[bool] = None, - populate_index_metrics: Optional[bool] = None, - session_token: Optional[str] = None, - initial_headers: Optional[Dict[str, str]] = None, - max_integrated_cache_staleness_in_ms: Optional[int] = None, - priority: Optional[Literal["High", "Low"]] = None, - continuation_token_limit: Optional[int] = None, - response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, - throughput_bucket: Optional[int] = None, - **kwargs: Any - ) -> AsyncItemPaged[Dict[str, Any]]: + self, + query: str, + *, + continuation_token_limit: Optional[int] = None, + enable_scan_in_query: Optional[bool] = None, + initial_headers: Optional[Dict[str, str]] = None, + max_integrated_cache_staleness_in_ms: Optional[int] = None, + max_item_count: Optional[int] = None, + parameters: Optional[List[Dict[str, object]]] = None, + partition_key: Optional[PartitionKeyType] = None, + populate_index_metrics: Optional[bool] = None, + populate_query_metrics: Optional[bool] = None, + priority: Optional[Literal["High", "Low"]] = None, + response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, + session_token: Optional[str] = None, + throughput_bucket: Optional[int] = None, + **kwargs: Any + ): """Return all results matching the given `query`. You can use any value for the container name in the FROM clause, but @@ -451,40 +452,189 @@ def query_items( the WHERE clause. :param str query: The Azure Cosmos DB SQL query to execute. + :keyword int continuation_token_limit: The size limit in kb of the response continuation token in the query + response. Valid values are positive integers. + A value of 0 is the same as not passing a value (default no limit). + :keyword bool enable_scan_in_query: Allow scan on the queries which couldn't be served as + indexing was opted out on the requested paths. + :keyword list[str] excluded_locations: Excluded locations to be skipped from preferred locations. The locations + in this list are specified as the names of the Azure Cosmos locations like, 'West US', 'East US' and so on. + If all preferred locations were excluded, primary/hub location will be used. + This excluded_location will override existing excluded_locations in client level. + :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. + :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in + milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, + responses are guaranteed to be no staler than this value. + :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :keyword parameters: Optional array of parameters to the query. Each parameter is a dict() with 'name' and 'value' keys. Ignored if no query is provided. - :paramtype parameters: List[Dict[str, Any]] - :keyword partition_key: Specifies the partition key value for the item. If none is provided, - a cross-partition query will be executed. + :paramtype parameters: [List[Dict[str, object]]] + :keyword partition_key: Partition key at which the query request is targeted. :paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] - :keyword int max_item_count: Max number of items to be returned in the enumeration operation. - :keyword bool enable_scan_in_query: Allow scan on the queries which couldn't be served as - indexing was opted out on the requested paths. - :keyword bool populate_query_metrics: Enable returning query metrics in response headers. :keyword bool populate_index_metrics: Used to obtain the index metrics to understand how the query engine used - existing indexes and how it could use potential new indexes. Please note that this options will incur + existing indexes and how it could use potential new indexes. Please note that this option will incur overhead, so it should be enabled only when debugging slow queries. - :keyword str session_token: Token for use with Session consistency. - :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. + :keyword bool populate_query_metrics: Enable returning query metrics in response headers. + :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each + request. Once the user has reached their provisioned throughput, low priority requests are throttled + before high priority requests start getting throttled. Feature must first be enabled at the account level. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str], Dict[str, Any]], None] + :keyword str session_token: Token for use with Session consistency. + :keyword int throughput_bucket: The desired throughput bucket for the client. + :returns: An Iterable of items (dicts). + :rtype: ItemPaged[Dict[str, Any]] + + .. admonition:: Example: + + .. literalinclude:: ../samples/examples_async.py + :start-after: [START query_items] + :end-before: [END query_items] + :language: python + :dedent: 0 + :caption: Get all products that have not been discontinued: + + .. literalinclude:: ../samples/examples_async.py + :start-after: [START query_items_param] + :end-before: [END query_items_param] + :language: python + :dedent: 0 + :caption: Parameterized query to get all products that have been discontinued: + """ + ... + + @overload + def query_items( + self, + query: str, + *, + continuation_token_limit: Optional[int] = None, + enable_scan_in_query: Optional[bool] = None, + feed_range: Optional[Dict[str, Any]] = None, + initial_headers: Optional[Dict[str, str]] = None, + max_integrated_cache_staleness_in_ms: Optional[int] = None, + max_item_count: Optional[int] = None, + parameters: Optional[List[Dict[str, object]]] = None, + populate_index_metrics: Optional[bool] = None, + populate_query_metrics: Optional[bool] = None, + priority: Optional[Literal["High", "Low"]] = None, + response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, + session_token: Optional[str] = None, + throughput_bucket: Optional[int] = None, + **kwargs: Any + ): + """Return all results matching the given `query`. + + You can use any value for the container name in the FROM clause, but + often the container name is used. In the examples below, the container + name is "products," and is aliased as "p" for easier referencing in + the WHERE clause. + + :param str query: The Azure Cosmos DB SQL query to execute. :keyword int continuation_token_limit: The size limit in kb of the response continuation token in the query response. Valid values are positive integers. A value of 0 is the same as not passing a value (default no limit). + :keyword bool enable_scan_in_query: Allow scan on the queries which couldn't be served as + indexing was opted out on the requested paths. + :keyword list[str] excluded_locations: Excluded locations to be skipped from preferred locations. The locations + in this list are specified as the names of the Azure Cosmos locations like, 'West US', 'East US' and so on. + If all preferred locations were excluded, primary/hub location will be used. + This excluded_location will override existing excluded_locations in client level. + :keyword Dict[str, Any] feed_range: The feed range that is used to define the scope. + :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, responses are guaranteed to be no staler than this value. + :keyword int max_item_count: Max number of items to be returned in the enumeration operation. + :keyword parameters: Optional array of parameters to the query. + Each parameter is a dict() with 'name' and 'value' keys. + Ignored if no query is provided. + :paramtype parameters: [List[Dict[str, object]]] + :keyword bool populate_index_metrics: Used to obtain the index metrics to understand how the query engine used + existing indexes and how it could use potential new indexes. Please note that this option will incur + overhead, so it should be enabled only when debugging slow queries. + :keyword bool populate_query_metrics: Enable returning query metrics in response headers. :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword int throughput_bucket: The desired throughput bucket for the client + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, str], Dict[str, Any]], None] + :keyword str session_token: Token for use with Session consistency. + :keyword int throughput_bucket: The desired throughput bucket for the client. + :returns: An Iterable of items (dicts). + :rtype: ItemPaged[Dict[str, Any]] + + .. admonition:: Example: + + .. literalinclude:: ../samples/examples_async.py + :start-after: [START query_items] + :end-before: [END query_items] + :language: python + :dedent: 0 + :caption: Get all products that have not been discontinued: + + .. literalinclude:: ../samples/examples_async.py + :start-after: [START query_items_param] + :end-before: [END query_items_param] + :language: python + :dedent: 0 + :caption: Parameterized query to get all products that have been discontinued: + """ + ... + + @distributed_trace + def query_items( + self, + *args: Any, + **kwargs: Any + ) -> AsyncItemPaged[Dict[str, Any]]: + """Return all results matching the given `query`. + + You can use any value for the container name in the FROM clause, but + often the container name is used. In the examples below, the container + name is "products," and is aliased as "p" for easier referencing in + the WHERE clause. + + :param Any args: args + :keyword int continuation_token_limit: The size limit in kb of the response continuation token in the query + response. Valid values are positive integers. + A value of 0 is the same as not passing a value (default no limit). + :keyword bool enable_cross_partition_query: Allows sending of more than one request to + execute the query in the Azure Cosmos DB service. + More than one request is necessary if the query is not scoped to single partition key value. + :keyword bool enable_scan_in_query: Allow scan on the queries which couldn't be served as + indexing was opted out on the requested paths. :keyword list[str] excluded_locations: Excluded locations to be skipped from preferred locations. The locations - in this list are specified as the names of the azure Cosmos locations like, 'West US', 'East US' and so on. + in this list are specified as the names of the Azure Cosmos locations like, 'West US', 'East US' and so on. If all preferred locations were excluded, primary/hub location will be used. This excluded_location will override existing excluded_locations in client level. - :returns: An AsyncItemPaged of items (dicts). - :rtype: AsyncItemPaged[Dict[str, Any]] + :keyword Dict[str, Any] feed_range: The feed range that is used to define the scope. + :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. + :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in + milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, + responses are guaranteed to be no staler than this value. + :keyword int max_item_count: Max number of items to be returned in the enumeration operation. + :keyword parameters: Optional array of parameters to the query. + Each parameter is a dict() with 'name' and 'value' keys. + Ignored if no query is provided. + :paramtype parameters: [List[Dict[str, object]]] + :keyword partition_key: Partition key at which the query request is targeted. + :paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] + :keyword bool populate_index_metrics: Used to obtain the index metrics to understand how the query engine used + existing indexes and how it could use potential new indexes. Please note that this option will incur + overhead, so it should be enabled only when debugging slow queries. + :keyword bool populate_query_metrics: Enable returning query metrics in response headers. + :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each + request. Once the user has reached their provisioned throughput, low priority requests are throttled + before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword str query: The Azure Cosmos DB SQL query to execute. + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, str], Dict[str, Any]], None] + :keyword str session_token: Token for use with Session consistency. + :keyword int throughput_bucket: The desired throughput bucket for the client. + :returns: An Iterable of items (dicts). + :rtype: ItemPaged[Dict[str, Any]] .. admonition:: Example: @@ -494,7 +644,6 @@ def query_items( :language: python :dedent: 0 :caption: Get all products that have not been discontinued: - :name: query_items .. literalinclude:: ../samples/examples_async.py :start-after: [START query_items_param] @@ -502,45 +651,53 @@ def query_items( :language: python :dedent: 0 :caption: Parameterized query to get all products that have been discontinued: - :name: query_items_param """ - if session_token is not None: - kwargs['session_token'] = session_token - if initial_headers is not None: - kwargs['initial_headers'] = initial_headers - if priority is not None: - kwargs['priority'] = priority - if throughput_bucket is not None: - kwargs["throughput_bucket"] = throughput_bucket + original_positional_arg_names = ["query"] + utils.add_args_to_kwargs(original_positional_arg_names, args, kwargs) feed_options = _build_options(kwargs) - if max_item_count is not None: - feed_options["maxItemCount"] = max_item_count - if populate_query_metrics is not None: - feed_options["populateQueryMetrics"] = populate_query_metrics - if populate_index_metrics is not None: - feed_options["populateIndexMetrics"] = populate_index_metrics - if enable_scan_in_query is not None: - feed_options["enableScanInQuery"] = enable_scan_in_query + + # Update 'feed_options' from 'kwargs' + if "max_item_count" in kwargs: + feed_options["maxItemCount"] = kwargs.pop("max_item_count") + if "populate_query_metrics" in kwargs: + feed_options["populateQueryMetrics"] = kwargs.pop("populate_query_metrics") + if "populate_index_metrics" in kwargs: + feed_options["populateIndexMetrics"] = kwargs.pop("populate_index_metrics") + if "enable_scan_in_query" in kwargs: + feed_options["enableScanInQuery"] = kwargs.pop("enable_scan_in_query") + if "max_integrated_cache_staleness_in_ms" in kwargs: + max_integrated_cache_staleness_in_ms = kwargs.pop("max_integrated_cache_staleness_in_ms") + validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) + feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms + if "continuation_token_limit" in kwargs: + feed_options["responseContinuationTokenLimitInKb"] = kwargs.pop("continuation_token_limit") + feed_options["correlatedActivityId"] = GenerateGuidId() + + # Set query with 'query' and 'parameters' from kwargs + if "parameters" in kwargs: + query = {"query": kwargs.pop("query", None), "parameters": kwargs.pop("parameters", None)} + else: + query = kwargs.pop("query", None) + + # Set method to get/cache container properties kwargs["containerProperties"] = self._get_properties_with_options - if partition_key is not None: - feed_options["partitionKey"] = self._set_partition_key(partition_key) + + utils.verify_exclusive_arguments(["feed_range", "partition_key"], **kwargs) + partition_key = None + if "feed_range" not in kwargs and "partition_key" in kwargs: + partition_key_value = kwargs.pop("partition_key") + feed_options["partitionKey"] = self._set_partition_key(partition_key_value) else: feed_options["enableCrossPartitionQuery"] = True - if max_integrated_cache_staleness_in_ms: - validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) - feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms - correlated_activity_id = GenerateGuidId() - feed_options["correlatedActivityId"] = correlated_activity_id - if continuation_token_limit is not None: - feed_options["responseContinuationTokenLimitInKb"] = continuation_token_limit + + # Set 'response_hook' + response_hook = kwargs.pop("response_hook", None) if response_hook and hasattr(response_hook, "clear"): response_hook.clear() - if self.container_link in self.__get_client_container_caches(): - feed_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"] items = self.client_connection.QueryItems( database_or_container_link=self.container_link, - query=query if parameters is None else {"query": query, "parameters": parameters}, + query=query, options=feed_options, partition_key=partition_key, response_hook=response_hook, diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index bbe1d5f0fae8..b43155853c07 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -56,6 +56,7 @@ from .. import documents from .._change_feed.aio.change_feed_iterable import ChangeFeedIterable from .._change_feed.change_feed_state import ChangeFeedState +from .._change_feed.feed_range_internal import FeedRangeInternalEpk from .._routing import routing_range from ..documents import ConnectionPolicy, DatabaseAccount from .._constants import _Constants as Constants @@ -71,9 +72,11 @@ from .. import _utils from ..partition_key import ( _Undefined, + _PartitionKeyKind, + _SequentialPartitionKeyType, _return_undefined_or_empty_partition_key, NonePartitionKeyValue, _Empty, - _get_partition_key_from_partition_key_definition + _build_partition_key_from_properties, ) from ._auth_policy_async import AsyncCosmosBearerTokenCredentialPolicy from .._cosmos_http_logging_policy import CosmosHttpLoggingPolicy @@ -2897,10 +2900,10 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]: return [] initial_headers = self.default_headers.copy() - cont_prop_func = kwargs.pop("containerProperties", None) - cont_prop = None - if cont_prop_func: - cont_prop = await cont_prop_func(options) # get properties with feed options + container_property_func = kwargs.pop("containerProperties", None) + container_property = None + if container_property_func: + container_property = await container_property_func(options) # get properties with feed options # Copy to make sure that default_headers won't be changed. if query is None: @@ -2950,22 +2953,22 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]: request_params = _request_object.RequestObject(typ, documents._OperationType.SqlQuery, req_headers) request_params.set_excluded_location_from_options(options) - # check if query has prefix partition key - partition_key_value = options.get("partitionKey", None) - is_prefix_partition_query = False - partition_key_obj = None - if cont_prop and partition_key_value is not None: - partition_key_definition = cont_prop["partitionKey"] - partition_key_obj = _get_partition_key_from_partition_key_definition(partition_key_definition) - is_prefix_partition_query = partition_key_obj._is_prefix_partition_key(partition_key_value) - - if is_prefix_partition_query and partition_key_obj: - # here get the overlapping ranges - req_headers.pop(http_constants.HttpHeaders.PartitionKey, None) - feed_range_epk = partition_key_obj._get_epk_range_for_prefix_partition_key( - partition_key_value) # cspell:disable-line - over_lapping_ranges = await self._routing_map_provider.get_overlapping_ranges(id_, - [feed_range_epk], + # Check if the over lapping ranges can be populated + feed_range_epk = None + if "feed_range" in kwargs: + feed_range = kwargs.pop("feed_range") + feed_range_epk = FeedRangeInternalEpk.from_json(feed_range).get_normalized_range() + elif options.get("partitionKey") is not None and container_property is not None: + # check if query has prefix partition key + partition_key_value = options["partitionKey"] + partition_key_obj = _build_partition_key_from_properties(container_property) + if partition_key_obj.is_prefix_partition_key(partition_key_value): + req_headers.pop(http_constants.HttpHeaders.PartitionKey, None) + partition_key_value = cast(_SequentialPartitionKeyType, partition_key_value) + feed_range_epk = partition_key_obj._get_epk_range_for_prefix_partition_key(partition_key_value) + + if feed_range_epk is not None: + over_lapping_ranges = await self._routing_map_provider.get_overlapping_ranges(id_, [feed_range_epk], options) results: Dict[str, Any] = {} # For each over lapping range we will take a sub range of the feed range EPK that overlaps with the over @@ -3180,7 +3183,7 @@ async def _AddPartitionKey(self, collection_link, document, options): # Extracts the partition key from the document using the partitionKey definition def _ExtractPartitionKey(self, partitionKeyDefinition, document): - if partitionKeyDefinition["kind"] == "MultiHash": + if partitionKeyDefinition["kind"] == _PartitionKeyKind.MULTI_HASH: ret = [] for partition_key_level in partitionKeyDefinition.get("paths"): # Parses the paths into a list of token each representing a property diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_partition_endpoint_manager_circuit_breaker_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_partition_endpoint_manager_circuit_breaker_async.py index b03426ddde08..9737b34fdd06 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_partition_endpoint_manager_circuit_breaker_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_partition_endpoint_manager_circuit_breaker_async.py @@ -23,7 +23,7 @@ """ from typing import TYPE_CHECKING, Optional -from azure.cosmos.partition_key import _get_partition_key_from_partition_key_definition +from azure.cosmos.partition_key import get_partition_key_from_definition from azure.cosmos._global_partition_endpoint_manager_circuit_breaker_core import \ _GlobalPartitionEndpointManagerForCircuitBreakerCore from azure.cosmos._routing.routing_range import PartitionKeyRangeWrapper, Range @@ -60,7 +60,7 @@ async def create_pk_range_wrapper(self, request: RequestObject) -> Optional[Part # get relevant information from container cache to get the overlapping ranges container_link = properties["container_link"] partition_key_definition = properties["partitionKey"] - partition_key = _get_partition_key_from_partition_key_definition(partition_key_definition) + partition_key = get_partition_key_from_definition(partition_key_definition) if HttpHeaders.PartitionKey in request.headers: partition_key_value = request.headers[HttpHeaders.PartitionKey] diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index e498501d5e51..bc16316086b8 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -23,7 +23,7 @@ """ import warnings from datetime import datetime -from typing import Any, Dict, List, Optional, Sequence, Union, Tuple, Mapping, Type, cast, overload, Iterable, Callable +from typing import Any, Dict, List, Optional, Sequence, Union, Tuple, Mapping, cast, overload, Iterable, Callable from typing_extensions import Literal from azure.core import MatchConditions @@ -31,6 +31,7 @@ from azure.core.tracing.decorator import distributed_trace from azure.cosmos._change_feed.change_feed_utils import add_args_to_kwargs, validate_kwargs +from . import _utils as utils from ._base import ( build_options, validate_cache_staleness_value, @@ -48,10 +49,10 @@ from .partition_key import ( NonePartitionKeyValue, PartitionKey, - _Empty, - _Undefined, + _PartitionKeyType, + _SequentialPartitionKeyType, + _build_partition_key_from_properties, _return_undefined_or_empty_partition_key, - _get_partition_key_from_partition_key_definition ) from .scripts import ScriptsProxy @@ -61,20 +62,10 @@ # pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs # pylint: disable=docstring-keyword-should-match-keyword-only -PartitionKeyType = Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]], Type[NonePartitionKeyValue]] # pylint: disable=line-too-long - -def get_partition_key_from_properties(container_properties: Dict[str, Any]) -> PartitionKey: - partition_key_definition = container_properties["partitionKey"] - return _get_partition_key_from_partition_key_definition(partition_key_definition) - -def is_prefix_partition_key(container_properties: Dict[str, Any], partition_key: PartitionKeyType) -> bool: - partition_key_obj: PartitionKey = get_partition_key_from_properties(container_properties) - return partition_key_obj._is_prefix_partition_key(partition_key) - def get_epk_range_for_partition_key( container_properties: Dict[str, Any], - partition_key_value: PartitionKeyType) -> Range: - partition_key_obj: PartitionKey = get_partition_key_from_properties(container_properties) + partition_key_value: _PartitionKeyType) -> Range: + partition_key_obj: PartitionKey = _build_partition_key_from_properties(container_properties) return partition_key_obj._get_epk_range_for_partition_key(partition_key_value) class ContainerProxy: # pylint: disable=too-many-public-methods @@ -150,8 +141,8 @@ def _get_conflict_link(self, conflict_or_link: Union[str, Mapping[str, Any]]) -> def _set_partition_key( self, - partition_key: PartitionKeyType - ) -> Union[str, int, float, bool, List[Union[str, int, float, bool]], _Empty, _Undefined]: + partition_key: _PartitionKeyType + ) -> _PartitionKeyType: if partition_key == NonePartitionKeyValue: return _return_undefined_or_empty_partition_key(self.is_system_key) return cast(Union[str, int, float, bool, List[Union[str, int, float, bool]]], partition_key) @@ -220,7 +211,7 @@ def read( # pylint:disable=docstring-missing-param def read_item( # pylint:disable=docstring-missing-param self, item: Union[str, Mapping[str, Any]], - partition_key: PartitionKeyType, + partition_key: _PartitionKeyType, populate_query_metrics: Optional[bool] = None, post_trigger_include: Optional[str] = None, *, @@ -368,7 +359,7 @@ def query_items_change_feed( *, max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, - partition_key: PartitionKeyType, + partition_key: _PartitionKeyType, priority: Optional[Literal["High", "Low"]] = None, mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, @@ -577,7 +568,7 @@ def query_items_change_feed( container_properties = self._get_properties_with_options(feed_options) if "partition_key" in kwargs: partition_key = kwargs.pop("partition_key") - change_feed_state_context["partitionKey"] = self._set_partition_key(cast(PartitionKeyType, partition_key)) + change_feed_state_context["partitionKey"] = self._set_partition_key(cast(_PartitionKeyType, partition_key)) change_feed_state_context["partitionKeyFeedRange"] = \ get_epk_range_for_partition_key(container_properties, partition_key) if "feed_range" in kwargs: @@ -597,27 +588,27 @@ def query_items_change_feed( ) return result - @distributed_trace - def query_items( # pylint:disable=docstring-missing-param - self, - query: str, - parameters: Optional[List[Dict[str, object]]] = None, - partition_key: Optional[PartitionKeyType] = None, - enable_cross_partition_query: Optional[bool] = None, - max_item_count: Optional[int] = None, - enable_scan_in_query: Optional[bool] = None, - populate_query_metrics: Optional[bool] = None, - *, - populate_index_metrics: Optional[bool] = None, - session_token: Optional[str] = None, - initial_headers: Optional[Dict[str, str]] = None, - max_integrated_cache_staleness_in_ms: Optional[int] = None, - priority: Optional[Literal["High", "Low"]] = None, - continuation_token_limit: Optional[int] = None, - throughput_bucket: Optional[int] = None, - response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, - **kwargs: Any - ) -> ItemPaged[Dict[str, Any]]: + @overload + def query_items( + self, + query: str, + *, + continuation_token_limit: Optional[int] = None, + enable_cross_partition_query: Optional[bool] = None, + enable_scan_in_query: Optional[bool] = None, + initial_headers: Optional[Dict[str, str]] = None, + max_integrated_cache_staleness_in_ms: Optional[int] = None, + max_item_count: Optional[int] = None, + parameters: Optional[List[Dict[str, object]]] = None, + partition_key: Optional[_PartitionKeyType] = None, + populate_index_metrics: Optional[bool] = None, + populate_query_metrics: Optional[bool] = None, + priority: Optional[Literal["High", "Low"]] = None, + response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, + session_token: Optional[str] = None, + throughput_bucket: Optional[int] = None, + **kwargs: Any + ): """Return all results matching the given `query`. You can use any value for the container name in the FROM clause, but @@ -626,40 +617,194 @@ def query_items( # pylint:disable=docstring-missing-param the WHERE clause. :param str query: The Azure Cosmos DB SQL query to execute. - :param parameters: Optional array of parameters to the query. - Each parameter is a dict() with 'name' and 'value' keys. - Ignored if no query is provided. - :type parameters: [List[Dict[str, object]]] - :param partition_key: partition key at which the query request is targeted. - :type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] - :param bool enable_cross_partition_query: Allows sending of more than one request to + :keyword int continuation_token_limit: The size limit in kb of the response continuation token in the query + response. Valid values are positive integers. + A value of 0 is the same as not passing a value (default no limit). + :keyword bool enable_cross_partition_query: Allows sending of more than one request to execute the query in the Azure Cosmos DB service. More than one request is necessary if the query is not scoped to single partition key value. - :param int max_item_count: Max number of items to be returned in the enumeration operation. - :param bool enable_scan_in_query: Allow scan on the queries which couldn't be served as + :keyword bool enable_scan_in_query: Allow scan on the queries which couldn't be served as indexing was opted out on the requested paths. - :param bool populate_query_metrics: Enable returning query metrics in response headers. - :keyword str session_token: Token for use with Session consistency. + :keyword list[str] excluded_locations: Excluded locations to be skipped from preferred locations. The locations + in this list are specified as the names of the Azure Cosmos locations like, 'West US', 'East US' and so on. + If all preferred locations were excluded, primary/hub location will be used. + This excluded_location will override existing excluded_locations in client level. :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. + :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in + milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, + responses are guaranteed to be no staler than this value. + :keyword int max_item_count: Max number of items to be returned in the enumeration operation. + :keyword parameters: Optional array of parameters to the query. + Each parameter is a dict() with 'name' and 'value' keys. + Ignored if no query is provided. + :paramtype parameters: [List[Dict[str, object]]] + :keyword partition_key: Partition key at which the query request is targeted. + :paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] + :keyword bool populate_index_metrics: Used to obtain the index metrics to understand how the query engine used + existing indexes and how it could use potential new indexes. Please note that this option will incur + overhead, so it should be enabled only when debugging slow queries. + :keyword bool populate_query_metrics: Enable returning query metrics in response headers. + :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each + request. Once the user has reached their provisioned throughput, low priority requests are throttled + before high priority requests start getting throttled. Feature must first be enabled at the account level. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str], Dict[str, Any]], None] + :keyword str session_token: Token for use with Session consistency. + :keyword int throughput_bucket: The desired throughput bucket for the client. + :returns: An Iterable of items (dicts). + :rtype: ItemPaged[Dict[str, Any]] + + .. admonition:: Example: + + .. literalinclude:: ../samples/examples.py + :start-after: [START query_items] + :end-before: [END query_items] + :language: python + :dedent: 0 + :caption: Get all products that have not been discontinued: + + .. literalinclude:: ../samples/examples.py + :start-after: [START query_items_param] + :end-before: [END query_items_param] + :language: python + :dedent: 0 + :caption: Parameterized query to get all products that have been discontinued: + """ + ... + + @overload + def query_items( + self, + query: str, + *, + continuation_token_limit: Optional[int] = None, + enable_cross_partition_query: Optional[bool] = None, + enable_scan_in_query: Optional[bool] = None, + feed_range: Optional[Dict[str, Any]] = None, + initial_headers: Optional[Dict[str, str]] = None, + max_integrated_cache_staleness_in_ms: Optional[int] = None, + max_item_count: Optional[int] = None, + parameters: Optional[List[Dict[str, object]]] = None, + populate_index_metrics: Optional[bool] = None, + populate_query_metrics: Optional[bool] = None, + priority: Optional[Literal["High", "Low"]] = None, + response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, + session_token: Optional[str] = None, + throughput_bucket: Optional[int] = None, + **kwargs: Any + ): + """Return all results matching the given `query`. + + You can use any value for the container name in the FROM clause, but + often the container name is used. In the examples below, the container + name is "products," and is aliased as "p" for easier referencing in + the WHERE clause. + + :param str query: The Azure Cosmos DB SQL query to execute. :keyword int continuation_token_limit: The size limit in kb of the response continuation token in the query response. Valid values are positive integers. A value of 0 is the same as not passing a value (default no limit). + :keyword bool enable_cross_partition_query: Allows sending of more than one request to + execute the query in the Azure Cosmos DB service. + More than one request is necessary if the query is not scoped to single partition key value. + :keyword bool enable_scan_in_query: Allow scan on the queries which couldn't be served as + indexing was opted out on the requested paths. + :keyword list[str] excluded_locations: Excluded locations to be skipped from preferred locations. The locations + in this list are specified as the names of the Azure Cosmos locations like, 'West US', 'East US' and so on. + If all preferred locations were excluded, primary/hub location will be used. + This excluded_location will override existing excluded_locations in client level. + :keyword Dict[str, Any] feed_range: The feed range that is used to define the scope. + :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, responses are guaranteed to be no staler than this value. + :keyword int max_item_count: Max number of items to be returned in the enumeration operation. + :keyword parameters: Optional array of parameters to the query. + Each parameter is a dict() with 'name' and 'value' keys. + Ignored if no query is provided. + :paramtype parameters: [List[Dict[str, object]]] + :keyword bool populate_index_metrics: Used to obtain the index metrics to understand how the query engine used + existing indexes and how it could use potential new indexes. Please note that this option will incur + overhead, so it should be enabled only when debugging slow queries. + :keyword bool populate_query_metrics: Enable returning query metrics in response headers. :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword bool populate_index_metrics: Used to obtain the index metrics to understand how the query engine used - existing indexes and how it could use potential new indexes. Please note that this options will incur - overhead, so it should be enabled only when debugging slow queries. - :keyword int throughput_bucket: The desired throughput bucket for the client + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, str], Dict[str, Any]], None] + :keyword str session_token: Token for use with Session consistency. + :keyword int throughput_bucket: The desired throughput bucket for the client. + :returns: An Iterable of items (dicts). + :rtype: ItemPaged[Dict[str, Any]] + + .. admonition:: Example: + + .. literalinclude:: ../samples/examples.py + :start-after: [START query_items] + :end-before: [END query_items] + :language: python + :dedent: 0 + :caption: Get all products that have not been discontinued: + + .. literalinclude:: ../samples/examples.py + :start-after: [START query_items_param] + :end-before: [END query_items_param] + :language: python + :dedent: 0 + :caption: Parameterized query to get all products that have been discontinued: + """ + ... + + @distributed_trace + def query_items( # pylint:disable=docstring-missing-param + self, + *args: Any, + **kwargs: Any + ) -> ItemPaged[Dict[str, Any]]: + """Return all results matching the given `query`. + + You can use any value for the container name in the FROM clause, but + often the container name is used. In the examples below, the container + name is "products," and is aliased as "p" for easier referencing in + the WHERE clause. + + :param Any args: args + :keyword int continuation_token_limit: The size limit in kb of the response continuation token in the query + response. Valid values are positive integers. + A value of 0 is the same as not passing a value (default no limit). + :keyword bool enable_cross_partition_query: Allows sending of more than one request to + execute the query in the Azure Cosmos DB service. + More than one request is necessary if the query is not scoped to single partition key value. + :keyword bool enable_scan_in_query: Allow scan on the queries which couldn't be served as + indexing was opted out on the requested paths. :keyword list[str] excluded_locations: Excluded locations to be skipped from preferred locations. The locations - in this list are specified as the names of the azure Cosmos locations like, 'West US', 'East US' and so on. + in this list are specified as the names of the Azure Cosmos locations like, 'West US', 'East US' and so on. If all preferred locations were excluded, primary/hub location will be used. This excluded_location will override existing excluded_locations in client level. + :keyword Dict[str, Any] feed_range: The feed range that is used to define the scope. + :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. + :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in + milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, + responses are guaranteed to be no staler than this value. + :keyword int max_item_count: Max number of items to be returned in the enumeration operation. + :keyword parameters: Optional array of parameters to the query. + Each parameter is a dict() with 'name' and 'value' keys. + Ignored if no query is provided. + :paramtype parameters: [List[Dict[str, object]]] + :keyword partition_key: Partition key at which the query request is targeted. + :paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] + :keyword bool populate_index_metrics: Used to obtain the index metrics to understand how the query engine used + existing indexes and how it could use potential new indexes. Please note that this option will incur + overhead, so it should be enabled only when debugging slow queries. + :keyword bool populate_query_metrics: Enable returning query metrics in response headers. + :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each + request. Once the user has reached their provisioned throughput, low priority requests are throttled + before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword str query: The Azure Cosmos DB SQL query to execute. + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, str], Dict[str, Any]], None] + :keyword str session_token: Token for use with Session consistency. + :keyword int throughput_bucket: The desired throughput bucket for the client. :returns: An Iterable of items (dicts). :rtype: ItemPaged[Dict[str, Any]] @@ -679,47 +824,64 @@ def query_items( # pylint:disable=docstring-missing-param :dedent: 0 :caption: Parameterized query to get all products that have been discontinued: """ - if session_token is not None: - kwargs['session_token'] = session_token - if initial_headers is not None: - kwargs['initial_headers'] = initial_headers - if priority is not None: - kwargs['priority'] = priority - if throughput_bucket is not None: - kwargs["throughputBucket"] = throughput_bucket + # Add positional arguments to keyword argument to support backward compatibility. + original_positional_arg_names = ["query", "parameters", "partition_key", "enable_cross_partition_query", + "max_item_count", "enable_scan_in_query", "populate_query_metrics"] + utils.add_args_to_kwargs(original_positional_arg_names, args, kwargs) feed_options = build_options(kwargs) - if enable_cross_partition_query is not None: - feed_options["enableCrossPartitionQuery"] = enable_cross_partition_query - if max_item_count is not None: - feed_options["maxItemCount"] = max_item_count - if populate_query_metrics is not None: - feed_options["populateQueryMetrics"] = populate_query_metrics - if populate_index_metrics is not None: - feed_options["populateIndexMetrics"] = populate_index_metrics - properties = self._get_properties_with_options(feed_options) - if partition_key is not None: - partition_key_value = self._set_partition_key(partition_key) - if is_prefix_partition_key(properties, partition_key): - kwargs["isPrefixPartitionQuery"] = True - kwargs["partitionKeyDefinition"] = properties["partitionKey"] - kwargs["partitionKeyDefinition"]["partition_key"] = partition_key_value - else: - feed_options["partitionKey"] = partition_key_value - if enable_scan_in_query is not None: - feed_options["enableScanInQuery"] = enable_scan_in_query - if max_integrated_cache_staleness_in_ms: + + # Get container property and init client container caches + container_properties = self._get_properties_with_options(feed_options) + + # Update 'feed_options' from 'kwargs' + if "enable_cross_partition_query" in kwargs: + feed_options["enableCrossPartitionQuery"] = kwargs.pop("enable_cross_partition_query") + if "max_item_count" in kwargs: + feed_options["maxItemCount"] = kwargs.pop("max_item_count") + if "populate_query_metrics" in kwargs: + feed_options["populateQueryMetrics"] = kwargs.pop("populate_query_metrics") + if "populate_index_metrics" in kwargs: + feed_options["populateIndexMetrics"] = kwargs.pop("populate_index_metrics") + if "enable_scan_in_query" in kwargs: + feed_options["enableScanInQuery"] = kwargs.pop("enable_scan_in_query") + if "max_integrated_cache_staleness_in_ms" in kwargs: + max_integrated_cache_staleness_in_ms = kwargs.pop("max_integrated_cache_staleness_in_ms") validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms - correlated_activity_id = GenerateGuidId() - feed_options["correlatedActivityId"] = correlated_activity_id - if continuation_token_limit is not None: - feed_options["responseContinuationTokenLimitInKb"] = continuation_token_limit + if "continuation_token_limit" in kwargs: + feed_options["responseContinuationTokenLimitInKb"] = kwargs.pop("continuation_token_limit") + feed_options["correlatedActivityId"] = GenerateGuidId() + feed_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"] + + # Set query with 'query' and 'parameters' from kwargs + if "parameters" in kwargs: + query = {"query": kwargs.pop("query", None), "parameters": kwargs.pop("parameters", None)} + else: + query = kwargs.pop("query", None) + + # Set range filters for query. Options are either 'feed_range' or 'partition_key' + utils.verify_exclusive_arguments(["feed_range", "partition_key"], **kwargs) + if "feed_range" not in kwargs and "partition_key" in kwargs: + partition_key_value = self._set_partition_key(kwargs.pop("partition_key")) + partition_key_obj = _build_partition_key_from_properties(container_properties) + if partition_key_obj.is_prefix_partition_key(partition_key_value): + kwargs["prefix_partition_key_object"] = partition_key_obj + kwargs["prefix_partition_key_value"] = cast(_SequentialPartitionKeyType, partition_key_value) + else: + # Add to feed_options, only when feed_range not given and partition_key was not prefixed partition_key + feed_options["partitionKey"] = partition_key_value + + # Set 'partition_key' for QueryItems method. This can be 'None' if feed range or prefix partition key was set + partition_key = feed_options.get("partitionKey") + + # Set 'response_hook' + response_hook = kwargs.pop("response_hook", None) if response_hook and hasattr(response_hook, "clear"): response_hook.clear() - feed_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"] + items = self.client_connection.QueryItems( database_or_container_link=self.container_link, - query=query if parameters is None else {"query": query, "parameters": parameters}, + query=query, options=feed_options, partition_key=partition_key, response_hook=response_hook, @@ -1006,7 +1168,7 @@ def create_item( # pylint:disable=docstring-missing-param def patch_item( self, item: Union[str, Dict[str, Any]], - partition_key: PartitionKeyType, + partition_key: _PartitionKeyType, patch_operations: List[Dict[str, Any]], *, filter_predicate: Optional[str] = None, @@ -1095,7 +1257,7 @@ def patch_item( def execute_item_batch( self, batch_operations: Sequence[Union[Tuple[str, Tuple[Any, ...]], Tuple[str, Tuple[Any, ...], Dict[str, Any]]]], - partition_key: PartitionKeyType, + partition_key: _PartitionKeyType, *, pre_trigger_include: Optional[str] = None, post_trigger_include: Optional[str] = None, @@ -1167,7 +1329,7 @@ def execute_item_batch( def delete_item( # pylint:disable=docstring-missing-param self, item: Union[Mapping[str, Any], str], - partition_key: PartitionKeyType, + partition_key: _PartitionKeyType, populate_query_metrics: Optional[bool] = None, pre_trigger_include: Optional[str] = None, post_trigger_include: Optional[str] = None, @@ -1359,7 +1521,7 @@ def query_conflicts( query: str, parameters: Optional[List[Dict[str, object]]] = None, enable_cross_partition_query: Optional[bool] = None, - partition_key: Optional[PartitionKeyType] = None, + partition_key: Optional[_PartitionKeyType] = None, max_item_count: Optional[int] = None, *, response_hook: Optional[Callable[[Mapping[str, Any], ItemPaged[Dict[str, Any]]], None]] = None, @@ -1405,7 +1567,7 @@ def query_conflicts( def get_conflict( self, conflict: Union[str, Mapping[str, Any]], - partition_key: PartitionKeyType, + partition_key: _PartitionKeyType, **kwargs: Any ) -> Dict[str, Any]: """Get the conflict identified by `conflict`. @@ -1433,7 +1595,7 @@ def get_conflict( def delete_conflict( self, conflict: Union[str, Mapping[str, Any]], - partition_key: PartitionKeyType, + partition_key: _PartitionKeyType, **kwargs: Any ) -> None: """Delete a specified conflict from the container. @@ -1462,7 +1624,7 @@ def delete_conflict( @distributed_trace def delete_all_items_by_partition_key( self, - partition_key: PartitionKeyType, + partition_key: _PartitionKeyType, *, pre_trigger_include: Optional[str] = None, post_trigger_include: Optional[str] = None, @@ -1583,7 +1745,7 @@ def get_latest_session_token( """ return get_latest_session_token(feed_ranges_to_session_tokens, target_feed_range) - def feed_range_from_partition_key(self, partition_key: PartitionKeyType) -> Dict[str, Any]: + def feed_range_from_partition_key(self, partition_key: _PartitionKeyType) -> Dict[str, Any]: """ Gets the feed range for a given partition key. :param partition_key: partition key to get feed range. :type partition_key: PartitionKeyType diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py b/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py index 6a6454aaf3ea..e2c287367b25 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py @@ -23,8 +23,7 @@ from io import BytesIO import binascii import struct -from typing import IO, Sequence, Type, Union, overload, List, cast, Dict, Any - +from typing import Any, Dict, IO, List, Sequence, Type, Union, cast, overload from typing_extensions import Literal from ._cosmos_integers import _UInt32, _UInt64, _UInt128 @@ -69,6 +68,13 @@ class _PartitionKeyComponentType: Float = 0x14 Infinity = 0xFF +class _PartitionKeyKind: + HASH: str = "Hash" + MULTI_HASH: str = "MultiHash" + +class _PartitionKeyVersion: + V1: int = 1 + V2: int = 2 class NonePartitionKeyValue: """Represents None value for partitionKey when it's missing in a container. @@ -90,6 +96,9 @@ class _Undefined: class _Infinity: """Represents infinity value for partitionKey.""" +_SingularPartitionKeyType = Union[None, bool, float, int, str, Type[NonePartitionKeyValue], _Empty, _Undefined] +_SequentialPartitionKeyType = Sequence[_SingularPartitionKeyType] +_PartitionKeyType = Union[_SingularPartitionKeyType, _SequentialPartitionKeyType] class PartitionKey(dict): """Key used to partition a container into logical partitions. @@ -125,17 +134,22 @@ class PartitionKey(dict): """ @overload - def __init__(self, path: List[str], *, kind: Literal["MultiHash"] = "MultiHash", version: int = 2) -> None: + def __init__(self, path: List[str], *, kind: Literal["MultiHash"] = "MultiHash", + version: int = _PartitionKeyVersion.V2 + ) -> None: ... @overload - def __init__(self, path: str, *, kind: Literal["Hash"] = "Hash", version: int = 2) -> None: + def __init__(self, path: str, *, kind: Literal["Hash"] = "Hash", + version:int = _PartitionKeyVersion.V2 + ) -> None: ... def __init__(self, *args, **kwargs): path = args[0] if args else kwargs['path'] - kind = args[1] if len(args) > 1 else kwargs.get('kind', 'Hash' if isinstance(path, str) else 'MultiHash') - version = args[2] if len(args) > 2 else kwargs.get('version', 2) + kind = args[1] if len(args) > 1 else kwargs.get('kind', _PartitionKeyKind.HASH if isinstance(path, str) + else _PartitionKeyKind.MULTI_HASH) + version = args[2] if len(args) > 2 else kwargs.get('version', _PartitionKeyVersion.V2) super().__init__(paths=[path] if isinstance(path, str) else path, kind=kind, version=version) def __repr__(self) -> str: @@ -151,7 +165,7 @@ def kind(self, value: Literal["MultiHash", "Hash"]) -> None: @property def path(self) -> str: - if self.kind == "MultiHash": + if self.kind == _PartitionKeyKind.MULTI_HASH: return ''.join(self["paths"]) return self["paths"][0] @@ -172,9 +186,9 @@ def version(self, value: int) -> None: def _get_epk_range_for_prefix_partition_key( self, - pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] + pk_value: _SequentialPartitionKeyType ) -> _Range: - if self.kind != "MultiHash": + if self.kind != _PartitionKeyKind.MULTI_HASH: raise ValueError( "Effective Partition Key Range for Prefix Partition Keys is only supported for Hierarchical Partition Keys.") # pylint: disable=line-too-long len_pk_value = len(pk_value) @@ -198,33 +212,31 @@ def _get_epk_range_for_prefix_partition_key( def _get_epk_range_for_partition_key( self, - pk_value: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]], Type[NonePartitionKeyValue]] # pylint: disable=line-too-long + pk_value: _PartitionKeyType ) -> _Range: - if self._is_prefix_partition_key(pk_value): + if self.is_prefix_partition_key(pk_value): return self._get_epk_range_for_prefix_partition_key( - cast(Sequence[Union[None, bool, int, float, str, Type[NonePartitionKeyValue]]], pk_value)) + cast(_SequentialPartitionKeyType, pk_value)) # else return point range if isinstance(pk_value, (list, tuple)) or (isinstance(pk_value, Sequence) and not isinstance(pk_value, str)): - effective_partition_key_string = self._get_effective_partition_key_string( - cast(Sequence[Union[None, bool, int, float, str, Type[NonePartitionKeyValue]]], pk_value)) + effective_partition_key_string = self._get_effective_partition_key_string(pk_value) else: effective_partition_key_string =\ - self._get_effective_partition_key_string( - cast(List[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]], [pk_value])) + self._get_effective_partition_key_string([pk_value]) return _Range(effective_partition_key_string, effective_partition_key_string, True, True) @staticmethod def _truncate_for_v1_hashing( - value: Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]] - ) -> Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]: + value: _SingularPartitionKeyType + ) -> _SingularPartitionKeyType: if isinstance(value, str): return value[:100] return value @staticmethod def _get_effective_partition_key_for_hash_partitioning( - pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] + pk_value: Union[str, _SequentialPartitionKeyType] ) -> str: truncated_components = [] # In Python, Strings are sequences, so we make sure we instead hash the entire string instead of each character @@ -247,44 +259,50 @@ def _get_effective_partition_key_for_hash_partitioning( partition_key_components = [hash_value] + truncated_components return _to_hex_encoded_binary_string_v1(partition_key_components) - def _get_effective_partition_key_string( - self, - pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] + @staticmethod + def _get_hashed_partition_key_string( + pk_value: _SequentialPartitionKeyType, + kind: str, + version: int = _PartitionKeyVersion.V2, ) -> Union[int, str]: if not pk_value: return _MinimumInclusiveEffectivePartitionKey - if isinstance(self, _Infinity): - return _MaximumExclusiveEffectivePartitionKey - - kind = self.kind - if kind == 'Hash': - version = self.version or 2 - if version == 1: + if kind == _PartitionKeyKind.HASH: + if version == _PartitionKeyVersion.V1: return PartitionKey._get_effective_partition_key_for_hash_partitioning(pk_value) - if version == 2: + if version == _PartitionKeyVersion.V2: return PartitionKey._get_effective_partition_key_for_hash_partitioning_v2(pk_value) - elif kind == 'MultiHash': - return self._get_effective_partition_key_for_multi_hash_partitioning_v2(pk_value) + elif kind == _PartitionKeyKind.MULTI_HASH: + return PartitionKey._get_effective_partition_key_for_multi_hash_partitioning_v2(pk_value) return _to_hex_encoded_binary_string(pk_value) + def _get_effective_partition_key_string( + self, + pk_value: _SequentialPartitionKeyType + ) -> Union[int, str]: + if isinstance(self, _Infinity): + return _MaximumExclusiveEffectivePartitionKey + + return PartitionKey._get_hashed_partition_key_string(pk_value=pk_value, kind=self.kind, version=self.version) + @staticmethod def _write_for_hashing( - value: Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]], + value: _SingularPartitionKeyType, writer: IO[bytes] ) -> None: PartitionKey._write_for_hashing_core(value, bytes([0]), writer) @staticmethod def _write_for_hashing_v2( - value: Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]], + value: _SingularPartitionKeyType, writer: IO[bytes] ) -> None: PartitionKey._write_for_hashing_core(value, bytes([0xFF]), writer) @staticmethod def _write_for_hashing_core( - value: Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]], + value: _SingularPartitionKeyType, string_suffix: bytes, writer: IO[bytes] ) -> None: @@ -310,7 +328,7 @@ def _write_for_hashing_core( @staticmethod def _get_effective_partition_key_for_hash_partitioning_v2( - pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] + pk_value: _SequentialPartitionKeyType ) -> str: with BytesIO() as ms: for component in pk_value: @@ -329,7 +347,7 @@ def _get_effective_partition_key_for_hash_partitioning_v2( @staticmethod def _get_effective_partition_key_for_multi_hash_partitioning_v2( - pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] + pk_value: _SequentialPartitionKeyType ) -> str: sb = [] for value in pk_value: @@ -351,10 +369,10 @@ def _get_effective_partition_key_for_multi_hash_partitioning_v2( return ''.join(sb).upper() - def _is_prefix_partition_key( + def is_prefix_partition_key( self, - partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]], Type[NonePartitionKeyValue]]) -> bool: # pylint: disable=line-too-long - if self.kind != "MultiHash": + partition_key: _PartitionKeyType) -> bool: # pylint: disable=line-too-long + if self.kind != _PartitionKeyKind.MULTI_HASH: return False ret = ((isinstance(partition_key, Sequence) and not isinstance(partition_key, str)) and len(self['paths']) != len(partition_key)) @@ -377,6 +395,7 @@ def _to_hex_encoded_binary_string(components: Sequence[object]) -> str: for component in components: if isinstance(component, (bool, int, float, str, _Infinity, _Undefined)): + component = cast(_SingularPartitionKeyType, component) _write_for_binary_encoding(component, ms) else: raise TypeError(f"Unexpected type for PK component: {type(component)}") @@ -387,6 +406,7 @@ def _to_hex_encoded_binary_string_v1(components: Sequence[object]) -> str: ms = BytesIO() for component in components: if isinstance(component, (bool, int, float, str, _Infinity, _Undefined)): + component = cast(_SingularPartitionKeyType, component) _write_for_binary_encoding_v1(component, ms) else: raise TypeError(f"Unexpected type for PK component: {type(component)}") @@ -394,7 +414,7 @@ def _to_hex_encoded_binary_string_v1(components: Sequence[object]) -> str: return _to_hex(bytearray(ms.getvalue()), 0, ms.tell()) def _write_for_binary_encoding_v1( - value: Union[bool, int, float, str, _Infinity, _Undefined], + value: _SingularPartitionKeyType, binary_writer: IO[bytes] ) -> None: if isinstance(value, bool): @@ -447,7 +467,7 @@ def _write_for_binary_encoding_v1( binary_writer.write(bytes([_PartitionKeyComponentType.Undefined])) def _write_for_binary_encoding( - value: Union[bool, int, float, str, _Infinity, _Undefined], + value: _SingularPartitionKeyType, binary_writer: IO[bytes] ) -> None: if isinstance(value, bool): @@ -496,8 +516,7 @@ def _write_for_binary_encoding( elif isinstance(value, _Undefined): binary_writer.write(bytes([_PartitionKeyComponentType.Undefined])) - -def _get_partition_key_from_partition_key_definition( +def get_partition_key_from_definition( partition_key_definition: Union[Dict[str, Any], "PartitionKey"] ) -> "PartitionKey": """Internal method to create a PartitionKey instance from a dictionary or PartitionKey object. @@ -511,3 +530,7 @@ def _get_partition_key_from_partition_key_definition( kind = partition_key_definition.get("kind", "Hash") version: int = partition_key_definition.get("version", 1) # Default to version 1 if not provided return PartitionKey(path=path, kind=kind, version=version) + +def _build_partition_key_from_properties(container_properties: Dict[str, Any]) -> PartitionKey: + partition_key_definition = container_properties["partitionKey"] + return get_partition_key_from_definition(partition_key_definition) diff --git a/sdk/cosmos/azure-cosmos/samples/examples.py b/sdk/cosmos/azure-cosmos/samples/examples.py index 6f51ae61bba8..b0d1597a69a9 100644 --- a/sdk/cosmos/azure-cosmos/samples/examples.py +++ b/sdk/cosmos/azure-cosmos/samples/examples.py @@ -137,6 +137,20 @@ container.delete_item(queried_item, partition_key="Widget") # [END delete_items] +# Query items with feed range is also supported. This example +# gets all items within the feed range. +# [START query_items_feed_range] +import json + +for feed_range in container.read_feed_ranges(): + for queried_item in container.query_items( + query='SELECT * FROM c', + enable_cross_partition_query=True, + feed_range=feed_range, + ): + print(json.dumps(queried_item, indent=True)) +# [END query_items_param] + # Retrieve the properties of a database # [START get_database_properties] properties = database.read() diff --git a/sdk/cosmos/azure-cosmos/samples/examples_async.py b/sdk/cosmos/azure-cosmos/samples/examples_async.py index d67230df7238..c37a2809f35e 100644 --- a/sdk/cosmos/azure-cosmos/samples/examples_async.py +++ b/sdk/cosmos/azure-cosmos/samples/examples_async.py @@ -7,6 +7,7 @@ from azure.cosmos import exceptions, PartitionKey from azure.cosmos.aio import CosmosClient +import json import os @@ -97,8 +98,6 @@ async def examples_async(): # The asynchronous client returns asynchronous iterators for its query methods; # as such, we iterate over it by using an async for loop # [START query_items] - import json - async for queried_item in container.query_items( query='SELECT * FROM products p WHERE p.productModel <> "DISCONTINUED"' ): @@ -247,8 +246,6 @@ async def examples_async(): # Query the items in a container using SQL-like syntax. This example # gets all items whose product model hasn't been discontinued. # [START query_items] - import json - async for queried_item in container.query_items( query='SELECT * FROM location l WHERE l.state = "WA"' ): @@ -262,6 +259,16 @@ async def examples_async(): await container.delete_item(item_dict, partition_key=["GA", "Atlanta", 30363]) # [END delete_items] + # Query items with feed range is also supported. This example + # gets all items within the feed range. + # [START query_items_feed_range] + async for feed_range in container.read_feed_ranges(): + async for queried_item in container.query_items( + query='SELECT * from c', + feed_range=feed_range): + print(json.dumps(queried_item, indent=True)) + # [END query_items_param] + # Get the feed ranges list from container. # [START read_feed_ranges] feed_ranges = [feed_range async for feed_range in container.read_feed_ranges()] diff --git a/sdk/cosmos/azure-cosmos/tests/test_change_feed.py b/sdk/cosmos/azure-cosmos/tests/test_change_feed.py index 94713d543003..0d8d5b6ed312 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_change_feed.py +++ b/sdk/cosmos/azure-cosmos/tests/test_change_feed.py @@ -51,7 +51,6 @@ def test_get_feed_ranges(self, setup): assert len(result) == 1 @pytest.mark.parametrize("change_feed_filter_param", ["partitionKey", "partitionKeyRangeId", "feedRange"]) - # @pytest.mark.parametrize("change_feed_filter_param", ["partitionKeyRangeId"]) def test_query_change_feed_with_different_filter(self, change_feed_filter_param, setup): created_collection = setup["created_db"].create_container(f"change_feed_test_{change_feed_filter_param}_{str(uuid.uuid4())}", PartitionKey(path="/pk")) diff --git a/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation.py b/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation.py index b55071064d91..abc31b406787 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation.py +++ b/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation.py @@ -7,7 +7,7 @@ import azure.cosmos.cosmos_client as cosmos_client import test_config -from azure.cosmos.partition_key import PartitionKey, _get_partition_key_from_partition_key_definition +from azure.cosmos.partition_key import PartitionKey, get_partition_key_from_definition from azure.cosmos.container import get_epk_range_for_partition_key @pytest.mark.cosmosEmulator @@ -249,7 +249,7 @@ def _get_properties_override(): "in the partition key definition.") # Create a PartitionKey instance from the definition and validate - partition_key_instance = _get_partition_key_from_partition_key_definition(partition_key_definition) + partition_key_instance = get_partition_key_from_definition(partition_key_definition) assert partition_key_instance.kind == "Hash", "Partition key kind mismatch." assert partition_key_instance.version == 1, "Partition key version mismatch." diff --git a/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation_async.py b/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation_async.py index 48d00337ae7f..5e3b2bc728f8 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation_async.py @@ -3,7 +3,7 @@ import pytest from azure.cosmos.aio import CosmosClient import test_config -from azure.cosmos.partition_key import PartitionKey, _get_partition_key_from_partition_key_definition +from azure.cosmos.partition_key import PartitionKey, get_partition_key_from_definition @pytest.mark.cosmosEmulator @pytest.mark.asyncio @@ -248,7 +248,7 @@ async def _get_properties_override(): "in the partition key definition.") # Create a PartitionKey instance from the definition and validate - partition_key_instance = _get_partition_key_from_partition_key_definition(partition_key_definition) + partition_key_instance = get_partition_key_from_definition(partition_key_definition) assert partition_key_instance.kind == "Hash", "Partition key kind mismatch." assert partition_key_instance.version == 1, "Partition key version mismatch." diff --git a/sdk/cosmos/azure-cosmos/tests/test_config.py b/sdk/cosmos/azure-cosmos/tests/test_config.py index f8c2f7832bdb..cc2e78b8e9d2 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_config.py +++ b/sdk/cosmos/azure-cosmos/tests/test_config.py @@ -7,17 +7,21 @@ import unittest import uuid +from azure.core.exceptions import AzureError, ServiceRequestError, ServiceResponseError, ClientAuthenticationError +from azure.core.pipeline.policies import AsyncRetryPolicy, RetryPolicy +from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternalEpk from azure.cosmos._retry_utility import _has_database_account_header, _has_read_retryable_headers, _configure_timeout +from azure.cosmos._routing.routing_range import Range from azure.cosmos.cosmos_client import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError from azure.cosmos.http_constants import StatusCodes -from azure.cosmos.partition_key import PartitionKey +from azure.cosmos.partition_key import (PartitionKey, PartitionKeyKind, PartitionKeyVersion, _Undefined, + NonePartitionKeyValue) from azure.cosmos import (ContainerProxy, DatabaseProxy, documents, exceptions, - http_constants, _retry_utility) -from azure.core.exceptions import AzureError, ServiceRequestError, ServiceResponseError, ClientAuthenticationError -from azure.core.pipeline.policies import AsyncRetryPolicy, RetryPolicy + http_constants) from devtools_testutils.azure_recorded_testcase import get_credential from devtools_testutils.helpers import is_live +from typing import Sequence, Type, Union try: import urllib3 @@ -525,3 +529,27 @@ async def send(self, request): self.update_context(response.context, retry_settings) return response + +def hash_partition_key_value( + pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]], + kind: str = PartitionKeyKind.HASH, + version: int = PartitionKeyVersion.V2, + ): + return PartitionKey.get_hashed_partition_key_string( + pk_value=pk_value, + kind=kind, + version=version, + ) + +def create_range(range_min: str, range_max: str, is_min_inclusive: bool = True, is_max_inclusive: bool = False): + if range_max == range_min: + range_max += "FF" + return Range( + range_min=range_min, + range_max=range_max, + isMinInclusive=is_min_inclusive, + isMaxInclusive=is_max_inclusive, + ) + +def create_feed_range_in_dict(feed_range): + return FeedRangeInternalEpk(feed_range).to_dict() diff --git a/sdk/cosmos/azure-cosmos/tests/test_query.py b/sdk/cosmos/azure-cosmos/tests/test_query.py index aa17116b2f39..1dd5cdf76d38 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_query.py +++ b/sdk/cosmos/azure-cosmos/tests/test_query.py @@ -568,6 +568,51 @@ def test_query_request_params_none_retry_policy(self): retry_utility.ExecuteFunction = self.OriginalExecuteFunction self.created_db.delete_container(created_collection.id) + def test_query_positional_args(self): + container = self.created_db.get_container_client(self.config.TEST_MULTI_PARTITION_CONTAINER_ID) + partition_key_value1 = "pk1" + partition_key_value2 = "pk2" + + num_items = 10 + new_items = [] + for pk_value in [partition_key_value1, partition_key_value2]: + for i in range(num_items): + item = { + self.config.TEST_CONTAINER_PARTITION_KEY: pk_value, + 'id': f"{pk_value}_{i}", + 'name': 'sample name' + } + new_items.append(item) + + for item in new_items: + container.upsert_item(body=item) + + query = "SELECT * FROM root r WHERE r.name=@name" + parameters = [{'name': '@name', 'value': 'sample name'}] + partition_key_value = partition_key_value2 + enable_cross_partition_query = True + max_item_count = 3 + enable_scan_in_query = True + populate_query_metrics = True + pager = container.query_items( + query, + parameters, + partition_key_value, + enable_cross_partition_query, + max_item_count, + enable_scan_in_query, + populate_query_metrics, + ).by_page() + + ids = [] + for page in pager: + items = list(page) + num_items = len(items) + for item in items: + assert item['pk'] == partition_key_value + ids.append(item['id']) + assert num_items <= max_item_count + assert ids == [item['id'] for item in new_items if item['pk'] == partition_key_value] def _MockExecuteFunctionSessionRetry(self, function, *args, **kwargs): if args: diff --git a/sdk/cosmos/azure-cosmos/tests/test_query_feed_range.py b/sdk/cosmos/azure-cosmos/tests/test_query_feed_range.py new file mode 100644 index 000000000000..83f6bce0d603 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_query_feed_range.py @@ -0,0 +1,131 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +import pytest +import test_config +import unittest +import uuid + +from azure.cosmos import CosmosClient +from itertools import combinations +from azure.cosmos.partition_key import PartitionKey +from typing import List, Mapping, Set + +CONFIG = test_config.TestConfig() +HOST = CONFIG.host +KEY = CONFIG.masterKey +DATABASE_ID = CONFIG.TEST_DATABASE_ID +TEST_NAME = "Query FeedRange " +SINGLE_PARTITION_CONTAINER_ID = TEST_NAME + CONFIG.TEST_SINGLE_PARTITION_CONTAINER_ID +MULTI_PARTITION_CONTAINER_ID = TEST_NAME + CONFIG.TEST_MULTI_PARTITION_CONTAINER_ID +TEST_CONTAINERS_IDS = [SINGLE_PARTITION_CONTAINER_ID, MULTI_PARTITION_CONTAINER_ID] +TEST_OFFER_THROUGHPUTS = [CONFIG.THROUGHPUT_FOR_1_PARTITION, CONFIG.THROUGHPUT_FOR_5_PARTITIONS] +PARTITION_KEY = CONFIG.TEST_CONTAINER_PARTITION_KEY +PK_VALUES = ('pk1', 'pk2', 'pk3') +def add_all_pk_values_to_set(items: List[Mapping[str, str]], pk_value_set: Set[str]) -> None: + if len(items) == 0: + return + + pk_values = [item[PARTITION_KEY] for item in items if PARTITION_KEY in item] + pk_value_set.update(pk_values) + +@pytest.fixture(scope="class", autouse=True) +def setup_and_teardown(): + print("Setup: This runs before any tests") + document_definitions = [{PARTITION_KEY: pk, 'id': str(uuid.uuid4())} for pk in PK_VALUES] + database = CosmosClient(HOST, KEY).get_database_client(DATABASE_ID) + + for container_id, offer_throughput in zip(TEST_CONTAINERS_IDS, TEST_OFFER_THROUGHPUTS): + container = database.create_container_if_not_exists( + id=container_id, + partition_key=PartitionKey(path='/' + PARTITION_KEY, kind='Hash'), + offer_throughput=offer_throughput) + for document_definition in document_definitions: + container.upsert_item(body=document_definition) + yield + # Code to run after tests + print("Teardown: This runs after all tests") + +def get_container(container_id: str): + client = CosmosClient(HOST, KEY) + db = client.get_database_client(DATABASE_ID) + return db.get_container_client(container_id) + +@pytest.mark.cosmosQuery +class TestQueryFeedRange(): + @pytest.mark.parametrize('container_id', TEST_CONTAINERS_IDS) + def test_query_with_feed_range_for_all_partitions(self, container_id): + container = get_container(container_id) + query = 'SELECT * from c' + + expected_pk_values = set(PK_VALUES) + actual_pk_values = set() + iter_feed_ranges = list(container.read_feed_ranges()) + for feed_range in iter_feed_ranges: + items = list(container.query_items( + query=query, + enable_cross_partition_query=True, + feed_range=feed_range + )) + add_all_pk_values_to_set(items, actual_pk_values) + assert actual_pk_values == expected_pk_values + + @pytest.mark.parametrize('container_id', TEST_CONTAINERS_IDS) + def test_query_with_feed_range_for_partition_key(self, container_id): + container = get_container(container_id) + query = 'SELECT * from c' + + for pk_value in PK_VALUES: + expected_pk_values = {pk_value} + actual_pk_values = set() + + feed_range = container.feed_range_from_partition_key(pk_value) + items = list(container.query_items( + query=query, + enable_cross_partition_query=True, + feed_range=feed_range + )) + add_all_pk_values_to_set(items, actual_pk_values) + assert actual_pk_values == expected_pk_values + + @pytest.mark.parametrize('container_id', TEST_CONTAINERS_IDS) + def test_query_with_both_feed_range_and_partition_key(self, container_id): + container = get_container(container_id) + + expected_error_message = "'feed_range' and 'partition_key' are exclusive parameters, please only set one of them." + query = 'SELECT * from c' + partition_key = PK_VALUES[0] + feed_range = container.feed_range_from_partition_key(partition_key) + with pytest.raises(ValueError) as e: + list(container.query_items( + query=query, + enable_cross_partition_query=True, + feed_range=feed_range, + partition_key=partition_key + )) + assert str(e.value) == expected_error_message + + @pytest.mark.parametrize('container_id', TEST_CONTAINERS_IDS) + def test_query_with_feed_range_for_a_full_range(self, container_id): + container = get_container(container_id) + query = 'SELECT * from c' + + expected_pk_values = set(PK_VALUES) + actual_pk_values = set() + new_range = test_config.create_range( + range_min="", + range_max="FF", + is_min_inclusive=True, + is_max_inclusive=False, + ) + feed_range = test_config.create_feed_range_in_dict(new_range) + items = list(container.query_items( + query=query, + enable_cross_partition_query=True, + feed_range=feed_range + )) + add_all_pk_values_to_set(items, actual_pk_values) + assert expected_pk_values.issubset(actual_pk_values) + +if __name__ == "__main__": + unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_query_feed_range_async.py b/sdk/cosmos/azure-cosmos/tests/test_query_feed_range_async.py new file mode 100644 index 000000000000..21388291523b --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_query_feed_range_async.py @@ -0,0 +1,138 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +import pytest +import pytest_asyncio +import test_config +import unittest +import uuid + +from azure.cosmos.aio import CosmosClient +from itertools import combinations +from azure.cosmos.partition_key import PartitionKey +from typing import List, Mapping, Set + +CONFIG = test_config.TestConfig() +HOST = CONFIG.host +KEY = CONFIG.credential +DATABASE_ID = CONFIG.TEST_DATABASE_ID +TEST_NAME = "Query FeedRange " +SINGLE_PARTITION_CONTAINER_ID = TEST_NAME + CONFIG.TEST_SINGLE_PARTITION_CONTAINER_ID +MULTI_PARTITION_CONTAINER_ID = TEST_NAME + CONFIG.TEST_MULTI_PARTITION_CONTAINER_ID +TEST_CONTAINERS_IDS = [SINGLE_PARTITION_CONTAINER_ID, MULTI_PARTITION_CONTAINER_ID] +TEST_OFFER_THROUGHPUTS = [CONFIG.THROUGHPUT_FOR_1_PARTITION, CONFIG.THROUGHPUT_FOR_5_PARTITIONS] +PARTITION_KEY = CONFIG.TEST_CONTAINER_PARTITION_KEY +PK_VALUES = ('pk1', 'pk2', 'pk3') +async def add_all_pk_values_to_set_async(items: List[Mapping[str, str]], pk_value_set: Set[str]) -> None: + if len(items) == 0: + return + + pk_values = [item[PARTITION_KEY] for item in items if PARTITION_KEY in item] + pk_value_set.update(pk_values) + +@pytest_asyncio.fixture(scope="class", autouse=True) +async def setup_and_teardown_async(): + print("Setup: This runs before any tests") + document_definitions = [{PARTITION_KEY: pk, 'id': str(uuid.uuid4())} for pk in PK_VALUES] + database = CosmosClient(HOST, KEY).get_database_client(DATABASE_ID) + + for container_id, offer_throughput in zip(TEST_CONTAINERS_IDS, TEST_OFFER_THROUGHPUTS): + container = await database.create_container_if_not_exists( + id=container_id, + partition_key=PartitionKey(path='/' + PARTITION_KEY, kind='Hash'), + offer_throughput=offer_throughput) + for document_definition in document_definitions: + await container.upsert_item(body=document_definition) + + yield + # Code to run after tests + print("Teardown: This runs after all tests") + +async def get_container(container_id: str): + client = CosmosClient(HOST, KEY) + db = client.get_database_client(DATABASE_ID) + return db.get_container_client(container_id) + +@pytest.mark.cosmosQuery +@pytest.mark.asyncio +@pytest.mark.usefixtures("setup_and_teardown_async") +class TestQueryFeedRangeAsync: + @pytest.mark.parametrize('container_id', TEST_CONTAINERS_IDS) + async def test_query_with_feed_range_for_all_partitions(self, container_id): + container = await get_container(container_id) + query = 'SELECT * from c' + + expected_pk_values = set(PK_VALUES) + actual_pk_values = set() + async for feed_range in container.read_feed_ranges(): + items = [item async for item in + (container.query_items( + query=query, + feed_range=feed_range + ) + )] + await add_all_pk_values_to_set_async(items, actual_pk_values) + assert expected_pk_values == actual_pk_values + + @pytest.mark.parametrize('container_id', TEST_CONTAINERS_IDS) + async def test_query_with_feed_range_for_partition_key(self, container_id): + container = await get_container(container_id) + query = 'SELECT * from c' + + for pk_value in PK_VALUES: + expected_pk_values = {pk_value} + actual_pk_values = set() + + feed_range = await container.feed_range_from_partition_key(pk_value) + items = [item async for item in + (container.query_items( + query=query, + feed_range=feed_range + ) + )] + await add_all_pk_values_to_set_async(items, actual_pk_values) + assert expected_pk_values == actual_pk_values + + @pytest.mark.parametrize('container_id', TEST_CONTAINERS_IDS) + async def test_query_with_both_feed_range_and_partition_key(self, container_id): + container = await get_container(container_id) + + expected_error_message = "'feed_range' and 'partition_key' are exclusive parameters, please only set one of them." + query = 'SELECT * from c' + partition_key = PK_VALUES[0] + feed_range = await container.feed_range_from_partition_key(partition_key) + with pytest.raises(ValueError) as e: + items = [item async for item in + (container.query_items( + query=query, + feed_range=feed_range, + partition_key=partition_key + ) + )] + assert str(e.value) == expected_error_message + + @pytest.mark.parametrize('container_id', TEST_CONTAINERS_IDS) + async def test_query_with_feed_range_for_a_full_range(self, container_id): + container = await get_container(container_id) + query = 'SELECT * from c' + + expected_pk_values = set(PK_VALUES) + actual_pk_values = set() + new_range = test_config.create_range( + range_min="", + range_max="FF", + is_min_inclusive=True, + is_max_inclusive=False, + ) + feed_range = test_config.create_feed_range_in_dict(new_range) + items = [item async for item in + (container.query_items( + query=query, + feed_range=feed_range + ) + )] + await add_all_pk_values_to_set_async(items, actual_pk_values) + assert expected_pk_values.issubset(actual_pk_values) + +if __name__ == "__main__": + unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_utils.py b/sdk/cosmos/azure-cosmos/tests/test_utils.py index 52e155748a77..9586b0548af4 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_utils.py +++ b/sdk/cosmos/azure-cosmos/tests/test_utils.py @@ -5,6 +5,8 @@ import unittest import uuid +import pytest + import azure.cosmos import azure.cosmos._utils as _utils import test_config @@ -34,6 +36,63 @@ def test_connection_string(self): self.assertTrue(db is not None) client.delete_database(db.id) + def test_add_args_to_kwargs(self): + arg_names = ["arg1", "arg2", "arg3", "arg4"] + args = ("arg1_val", "arg2_val", "arg3_val", "arg4_val") + kwargs = {} + + # Test any number of positional arguments less than or equals to the number of argument names + for num_args in range(len(arg_names)): + args = tuple(f"arg{i+1}_val" for i in range(num_args)) + kwargs = {} + _utils.add_args_to_kwargs(arg_names, args, kwargs) + + assert len(kwargs.keys()) == len(args) + for arg_name, arg in zip(arg_names, args): + assert arg_name in kwargs + assert kwargs[arg_name] == arg + + # test if arg_name already in kwargs + with pytest.raises(ValueError) as e: + _utils.add_args_to_kwargs(arg_names, args, kwargs) + assert str(e.value) == f"{arg_names[0]} cannot be used as positional and keyword argument at the same time." + + # Test if number of positional argument greater than expected argument names + args = ("arg1_val", "arg2_val", "arg3_val", "arg4_val", "arg5_val") + with pytest.raises(ValueError) as e: + _utils.add_args_to_kwargs(arg_names, args, kwargs) + assert str(e.value) == (f"Positional argument is out of range. Expected {len(arg_names)} arguments, " + f"but got {len(args)} instead. Please review argument list in API documentation.") + + def test_verify_exclusive_arguments(self): + exclusive_keys = ["key1", "key2", "key3", "key4"] + + ## Test valid cases + kwargs = {} + assert _utils.verify_exclusive_arguments(exclusive_keys, **kwargs) is None + + kwargs = {"key1": "test_value"} + assert _utils.verify_exclusive_arguments(exclusive_keys, **kwargs) is None + + kwargs = {"key1": "test_value", "key9": "test_value"} + assert _utils.verify_exclusive_arguments(exclusive_keys, **kwargs) is None + + # Even if some keys are in exclusive_keys list, if the values were 'None' we ignore them + kwargs = {"key1": "test_value", "key2": None, "key3": None} + assert _utils.verify_exclusive_arguments(exclusive_keys, **kwargs) is None + + ## Test invalid cases + kwargs = {"key1": "test_value", "key2": "test_value"} + expected_error_message = "key1, key2 are exclusive parameters, please only set one of them" + with pytest.raises(ValueError) as e: + _utils.verify_exclusive_arguments(exclusive_keys, **kwargs) + assert str(e.value) == expected_error_message + + kwargs = {"key1": "test_value", "key2": "test_value", "key3": "test_value"} + expected_error_message = "key1, key2, key3 are exclusive parameters, please only set one of them" + with pytest.raises(ValueError) as e: + _utils.verify_exclusive_arguments(exclusive_keys, **kwargs) + assert str(e.value) == expected_error_message if __name__ == "__main__": unittest.main()