diff --git a/sdk/cosmos/assets.json b/sdk/cosmos/assets.json index 0e743ffd6e..840a173bca 100644 --- a/sdk/cosmos/assets.json +++ b/sdk/cosmos/assets.json @@ -1,6 +1,6 @@ { "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "rust", - "Tag": "rust/azure_data_cosmos_a39b424a5b", + "Tag": "rust/azure_data_cosmos_8671767119", "TagPrefix": "rust/azure_data_cosmos" -} \ No newline at end of file +} diff --git a/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs b/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs index dabc9d8683..a9a5c3d871 100644 --- a/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs +++ b/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs @@ -3,30 +3,35 @@ use crate::{ constants, - models::{ContainerProperties, PatchDocument, ThroughputProperties}, + models::{ + ContainerProperties, FeedRange, PartitionKeyRange, PatchDocument, ThroughputProperties, + }, options::{QueryOptions, ReadContainerOptions}, - pipeline::CosmosPipeline, + pipeline::{self, CosmosPipeline}, resource_context::{ResourceLink, ResourceType}, - DeleteContainerOptions, FeedPager, ItemOptions, PartitionKey, Query, ReplaceContainerOptions, - ThroughputOptions, + DeleteContainerOptions, FeedPager, FeedRangeOptions, ItemOptions, PartitionKey, Query, + ReplaceContainerOptions, ThroughputOptions, }; use azure_core::http::{ headers::{self}, - request::{options::ContentType, Request}, + request::{ + options::{self, ContentType}, + Request, + }, response::Response, - Method, + Context, Method, RawResponse, }; -use serde::{de::DeserializeOwned, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; /// A client for working with a specific container in a Cosmos DB account. /// /// You can get a `Container` by calling [`DatabaseClient::container_client()`](crate::clients::DatabaseClient::container_client()). #[derive(Clone)] pub struct ContainerClient { - link: ResourceLink, - items_link: ResourceLink, - pipeline: CosmosPipeline, + pub(crate) link: ResourceLink, + pub(crate) pipeline: CosmosPipeline, + pub(crate) items_link: ResourceLink, } impl ContainerClient { @@ -708,8 +713,7 @@ impl ContainerClient { if partition_key.is_empty() { if let Some(query_engine) = options.query_engine.take() { return crate::query::executor::QueryExecutor::new( - self.pipeline.clone(), - self.link.clone(), + self.clone(), query, options, query_engine, @@ -727,4 +731,89 @@ impl ContainerClient { |r| r.insert_headers(&partition_key), ) } + + /// Gets the partition key ranges for this container. + /// + /// This is used internally to retrieve the partition key ranges, which are then + /// mapped to public `FeedRange` objects. + /// + /// This method is used by the query executor to determine the partition key ranges, and is not intended for direct use by applications. + /// Applications should use the [`get_feed_ranges()`](ContainerClient::get_feed_ranges) method to retrieve the partition key ranges as `FeedRange` objects. + pub(crate) async fn get_partition_key_ranges( + &self, + context: Context<'_>, + ) -> azure_core::Result> { + /// Response model for partition key ranges from the Cosmos DB service. + #[derive(Deserialize, Serialize)] + struct PartitionKeyRangesResult { + #[serde(rename = "PartitionKeyRanges")] + pub ranges: Vec, + } + let pkranges_link = self.link.feed(ResourceType::PartitionKeyRanges); + let url = self.pipeline.url(&pkranges_link); + let mut request = Request::new(url, Method::Get); + + let response = self + .pipeline + .send::(context, &mut request, pkranges_link) + .await? + .into_body() + .await?; + + Ok(response.ranges) + } + + /// Gets the feed ranges for this container. + /// + /// Feed ranges represent the partition key ranges for the container and can be used + /// to scope queries or change feed operations to specific subsets of data. + /// + /// # Examples + /// + /// ```rust,no_run + /// # async fn doc() -> Result<(), Box> { + /// # let container_client: azure_data_cosmos::clients::ContainerClient = panic!("this is a non-running example"); + /// let feed_ranges = container_client.get_feed_ranges().await?; + /// for range in feed_ranges { + /// println!("Range: {} to {}", range.min_inclusive, range.max_exclusive); + /// } + /// # Ok(()) + /// # } + /// ``` + pub async fn get_feed_ranges( + &self, + options: Option>, + ) -> azure_core::Result> { + let options = options.unwrap_or_default(); + let context = options.method_options.context; + let partition_key_ranges = self.get_partition_key_ranges(context).await?; + + let feed_ranges = partition_key_ranges + .into_iter() + .map(|pkrange| pkrange.range) + .collect(); + + Ok(feed_ranges) + } + + #[tracing::instrument(skip_all)] + pub(crate) async fn get_query_plan( + &self, + context: Context<'_>, + query: &Query, + supported_features: &str, + ) -> azure_core::Result { + let url = self.pipeline.url(&self.items_link); + let mut request = pipeline::create_base_query_request(url, query)?; + request.insert_header(constants::QUERY_ENABLE_CROSS_PARTITION, "True"); + request.insert_header(constants::IS_QUERY_PLAN_REQUEST, "True"); + request.insert_header( + constants::SUPPORTED_QUERY_FEATURES, + supported_features.to_string(), + ); + + self.pipeline + .send_raw(context, &mut request, self.items_link.clone()) + .await + } } diff --git a/sdk/cosmos/azure_data_cosmos/src/models/feed_range.rs b/sdk/cosmos/azure_data_cosmos/src/models/feed_range.rs new file mode 100644 index 0000000000..8f705f5cbd --- /dev/null +++ b/sdk/cosmos/azure_data_cosmos/src/models/feed_range.rs @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +use serde::{Deserialize, Serialize}; + +/// Represents a feed range for a container. +/// +/// A feed range represents a contiguous range of partition key values that can be used +/// to scope queries or change feed operations to a specific subset of data. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct FeedRange { + /// The minimum partition key value (inclusive) for this feed range. + pub min_inclusive: String, + /// The maximum partition key value (exclusive) for this feed range. + pub max_exclusive: String, +} + +/// Represents a partition key range with its ID and feed range information. +/// +/// This is used internally to represent the full partition key range information +/// returned by the Cosmos DB service. +/// +/// Partition Key Ranges are only used by the query engine as part of executing queries. +/// Applications should use [`FeedRange`](crate::models::FeedRange) when specifying ranges for queries or change feed operations. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct PartitionKeyRange { + /// The ID of the partition key range. + pub id: String, + /// The feed range information for this partition key range. + #[serde(flatten)] + pub range: FeedRange, +} diff --git a/sdk/cosmos/azure_data_cosmos/src/models/mod.rs b/sdk/cosmos/azure_data_cosmos/src/models/mod.rs index dbd3355383..608b8254cb 100644 --- a/sdk/cosmos/azure_data_cosmos/src/models/mod.rs +++ b/sdk/cosmos/azure_data_cosmos/src/models/mod.rs @@ -7,12 +7,14 @@ use azure_core::{http::Etag, time::OffsetDateTime}; use serde::{Deserialize, Deserializer, Serialize}; mod container_properties; +mod feed_range; mod indexing_policy; mod partition_key_definition; mod patch_operations; mod throughput_properties; pub use container_properties::*; +pub use feed_range::*; pub use indexing_policy::*; pub use partition_key_definition::*; pub use patch_operations::*; diff --git a/sdk/cosmos/azure_data_cosmos/src/options/mod.rs b/sdk/cosmos/azure_data_cosmos/src/options/mod.rs index 4b49bcecd6..9d82954ce9 100644 --- a/sdk/cosmos/azure_data_cosmos/src/options/mod.rs +++ b/sdk/cosmos/azure_data_cosmos/src/options/mod.rs @@ -111,3 +111,9 @@ pub struct ReadDatabaseOptions<'a> { pub struct ThroughputOptions<'a> { pub method_options: ClientMethodOptions<'a>, } + +/// Options to be passed to [`ContainerClient::get_feed_ranges()`](crate::clients::ContainerClient::get_feed_ranges()). +#[derive(Clone, Default)] +pub struct FeedRangeOptions<'a> { + pub method_options: ClientMethodOptions<'a>, +} diff --git a/sdk/cosmos/azure_data_cosmos/src/query/engine.rs b/sdk/cosmos/azure_data_cosmos/src/query/engine.rs index 8cfe60e7f1..a7445c080f 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/engine.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/engine.rs @@ -1,3 +1,5 @@ +use crate::models::PartitionKeyRange; + /// Represents a request from the query pipeline for data from a specific partition key range. pub struct QueryRequest { /// The ID of the partition key range to query. @@ -53,7 +55,7 @@ pub trait QueryEngine { /// ## Arguments /// * `query` - The query to be executed. /// * `plan` - The JSON-encoded query plan describing the query (usually provided by the gateway). - /// * `pkranges` - The JSON-encoded partition key ranges to be queried (usually provided by the gateway). + /// * `pkranges` - A slice of [`PartitionKeyRange`]s that the query should be executed against. /// /// ## Shared Access /// @@ -65,7 +67,7 @@ pub trait QueryEngine { &self, query: &str, plan: &[u8], - pkranges: &[u8], + pkranges: &[PartitionKeyRange], ) -> azure_core::Result; /// Gets a comma-separates list of features supported by this query engine, suitable for use in the `x-ms-cosmos-supported-query-features` header when requesting a query plan. diff --git a/sdk/cosmos/azure_data_cosmos/src/query/executor.rs b/sdk/cosmos/azure_data_cosmos/src/query/executor.rs index b47ca8c52a..8695a0b1d0 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/executor.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/executor.rs @@ -1,8 +1,10 @@ -use azure_core::http::{headers::Headers, Context, Method, RawResponse, Request}; -use serde::de::DeserializeOwned; +use azure_core::http::{headers::Headers, Context, Method, RawResponse, Request, Response}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use crate::{ + clients::ContainerClient, constants, + models::PartitionKeyRange, pipeline::{self, CosmosPipeline}, query::{OwnedQueryPipeline, QueryEngineRef, QueryResult}, resource_context::{ResourceLink, ResourceType}, @@ -10,9 +12,7 @@ use crate::{ }; pub struct QueryExecutor { - http_pipeline: CosmosPipeline, - container_link: ResourceLink, - items_link: ResourceLink, + container_client: ContainerClient, context: Context<'static>, query_engine: QueryEngineRef, base_request: Option, @@ -29,18 +29,14 @@ pub struct QueryExecutor { impl QueryExecutor { pub fn new( - http_pipeline: CosmosPipeline, - container_link: ResourceLink, + container_client: ContainerClient, query: Query, options: QueryOptions<'_>, query_engine: QueryEngineRef, ) -> azure_core::Result { - let items_link = container_link.feed(ResourceType::Items); let context = options.method_options.context.into_owned(); Ok(Self { - http_pipeline, - container_link, - items_link, + container_client, context, query_engine, base_request: None, @@ -76,33 +72,30 @@ impl QueryExecutor { ), None => { // Initialize the pipeline. - let query_plan = get_query_plan( - &self.http_pipeline, - &self.items_link, - Context::with_context(&self.context), - &self.query, - self.query_engine.supported_features()?, - ) - .await? - .into_body() - .collect() - .await?; - let pkranges = get_pkranges( - &self.http_pipeline, - &self.container_link, - Context::with_context(&self.context), - ) - .await? - .into_body() - .collect() - .await?; + let query_plan = self + .container_client + .get_query_plan( + Context::with_context(&self.context), + &self.query, + self.query_engine.supported_features()?, + ) + .await? + .into_body() + .collect() + .await?; + let pkranges = self + .container_client + .get_partition_key_ranges(Context::with_context(&self.context)) + .await?; let pipeline = self.query_engine .create_pipeline(&self.query.text, &query_plan, &pkranges)?; self.query.text = pipeline.query().into(); self.base_request = Some(crate::pipeline::create_base_query_request( - self.http_pipeline.url(&self.items_link), + self.container_client + .pipeline + .url(&self.container_client.items_link), &self.query, )?); self.pipeline = Some(pipeline); @@ -143,11 +136,12 @@ impl QueryExecutor { } let resp = self - .http_pipeline + .container_client + .pipeline .send_raw( Context::with_context(&self.context), &mut query_request, - self.items_link.clone(), + self.container_client.items_link.clone(), ) .await?; @@ -172,42 +166,3 @@ impl QueryExecutor { Ok(None) } } - -// This isn't an inherent method on QueryExecutor because that would force the whole executor to be Sync, which would force the pipeline to be Sync. -#[tracing::instrument(skip_all)] -async fn get_query_plan( - http_pipeline: &CosmosPipeline, - items_link: &ResourceLink, - context: Context<'_>, - query: &Query, - supported_features: &str, -) -> azure_core::Result { - let url = http_pipeline.url(items_link); - let mut request = pipeline::create_base_query_request(url, query)?; - request.insert_header(constants::QUERY_ENABLE_CROSS_PARTITION, "True"); - request.insert_header(constants::IS_QUERY_PLAN_REQUEST, "True"); - request.insert_header( - constants::SUPPORTED_QUERY_FEATURES, - supported_features.to_string(), - ); - - http_pipeline - .send_raw(context, &mut request, items_link.clone()) - .await -} - -// This isn't an inherent method on QueryExecutor because that would force the whole executor to be Sync, which would force the pipeline to be Sync. -#[tracing::instrument(skip_all)] -async fn get_pkranges( - http_pipeline: &CosmosPipeline, - container_link: &ResourceLink, - context: Context<'_>, -) -> azure_core::Result { - let pkranges_link = container_link.feed(ResourceType::PartitionKeyRanges); - let url = http_pipeline.url(&pkranges_link); - let mut base_request = Request::new(url, Method::Get); - - http_pipeline - .send_raw(context, &mut base_request, pkranges_link) - .await -} diff --git a/sdk/cosmos/azure_data_cosmos/tests/cosmos_containers.rs b/sdk/cosmos/azure_data_cosmos/tests/cosmos_containers.rs index 69b2db0d47..6a8091c80b 100644 --- a/sdk/cosmos/azure_data_cosmos/tests/cosmos_containers.rs +++ b/sdk/cosmos/azure_data_cosmos/tests/cosmos_containers.rs @@ -7,12 +7,14 @@ use std::error::Error; use azure_core_test::{recorded, TestContext}; use azure_data_cosmos::{ models::{ - ContainerProperties, IndexingMode, IndexingPolicy, PartitionKeyKind, PropertyPath, - ThroughputProperties, + ContainerProperties, FeedRange, IndexingMode, IndexingPolicy, PartitionKeyKind, + PropertyPath, ThroughputProperties, }, - CreateContainerOptions, Query, + CreateContainerOptions, FeedRangeOptions, Query, }; use futures::TryStreamExt; +use serde::{Deserialize, Serialize}; +use typespec_client_core::{sleep, time::Duration}; use framework::{test_data, TestAccount}; @@ -244,3 +246,42 @@ pub async fn container_crud_hierarchical_pk(context: TestContext) -> Result<(), Ok(()) } + +#[recorded::test] +pub async fn container_feed_ranges_test(context: TestContext) -> Result<(), Box> { + let account = TestAccount::from_env(context, None).await?; + let cosmos_client = account.connect_with_key(None)?; + let db_client = test_data::create_database(&account, &cosmos_client).await?; + + // Create the container with 30,000 RUs of manual throughput + let properties = ContainerProperties { + id: "FeedRanges".into(), + partition_key: "/id".into(), + ..Default::default() + }; + + let throughput = ThroughputProperties::manual(30000); + + db_client + .create_container( + properties.clone(), + Some(CreateContainerOptions { + throughput: Some(throughput), + ..Default::default() + }), + ) + .await? + .into_body() + .await?; + + let container_client = db_client.container_client(&properties.id); + + // Fetch and check the feed ranges. + // We don't make any assumptions about the number of ranges, just that we get results back + let initial_feed_ranges = container_client.get_feed_ranges(None).await?; + assert!(!initial_feed_ranges.is_empty()); + + account.cleanup().await?; + + Ok(()) +} diff --git a/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs b/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs index c55e7f39d5..913c089073 100644 --- a/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs +++ b/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs @@ -5,21 +5,12 @@ use std::{collections::VecDeque, sync::Mutex}; use serde::{Deserialize, Serialize}; -use azure_data_cosmos::query::{PipelineResult, QueryEngine, QueryPipeline}; +use azure_data_cosmos::{ + models::PartitionKeyRange, + query::{PipelineResult, QueryEngine, QueryPipeline}, +}; use serde_json::value::RawValue; -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct PartitionKeyRange { - pub id: String, -} - -#[derive(Deserialize)] -struct PkRanges { - #[serde(rename = "PartitionKeyRanges")] - pub ranges: Vec, -} - #[derive(Deserialize)] struct DocumentPayload { #[serde(rename = "Documents")] @@ -65,7 +56,7 @@ impl QueryEngine for MockQueryEngine { &self, query: &str, _plan: &[u8], - pkranges: &[u8], + pkranges: &[PartitionKeyRange], ) -> azure_core::Result> { { if let Some(err) = self.create_error.lock().unwrap().take() { @@ -73,11 +64,8 @@ impl QueryEngine for MockQueryEngine { } } - // Deserialize the partition key ranges. - let pkranges: PkRanges = serde_json::from_slice(pkranges)?; - // Create a mock pipeline with the partition key ranges. - let pipeline = MockQueryPipeline::new(query.to_string(), pkranges.ranges); + let pipeline = MockQueryPipeline::new(query.to_string(), pkranges.to_vec()); Ok(Box::new(pipeline)) }