Skip to content

add FeedRange APIs #2761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdk/cosmos/assets.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "rust",
"Tag": "rust/azure_data_cosmos_a39b424a5b",
"Tag": "rust/azure_data_cosmos_8671767119",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corresponding assets repository commit:

Azure/azure-sdk-assets@8671767

"TagPrefix": "rust/azure_data_cosmos"
}
}
113 changes: 101 additions & 12 deletions sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,35 @@

use crate::{
constants,
models::{ContainerProperties, PatchDocument, ThroughputProperties},
models::{
ContainerProperties, FeedRange, PartitionKeyRange, PatchDocument, ThroughputProperties,
},
options::{QueryOptions, ReadContainerOptions},
pipeline::CosmosPipeline,
pipeline::{self, CosmosPipeline},
resource_context::{ResourceLink, ResourceType},
DeleteContainerOptions, FeedPager, ItemOptions, PartitionKey, Query, ReplaceContainerOptions,
ThroughputOptions,
DeleteContainerOptions, FeedPager, FeedRangeOptions, ItemOptions, PartitionKey, Query,
ReplaceContainerOptions, ThroughputOptions,
};

use azure_core::http::{
headers::{self},
request::{options::ContentType, Request},
request::{
options::{self, ContentType},
Request,
},
response::Response,
Method,
Context, Method, RawResponse,
};
use serde::{de::DeserializeOwned, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};

/// A client for working with a specific container in a Cosmos DB account.
///
/// You can get a `Container` by calling [`DatabaseClient::container_client()`](crate::clients::DatabaseClient::container_client()).
#[derive(Clone)]
pub struct ContainerClient {
link: ResourceLink,
items_link: ResourceLink,
pipeline: CosmosPipeline,
pub(crate) link: ResourceLink,
pub(crate) pipeline: CosmosPipeline,
pub(crate) items_link: ResourceLink,
}

impl ContainerClient {
Expand Down Expand Up @@ -708,8 +713,7 @@ impl ContainerClient {
if partition_key.is_empty() {
if let Some(query_engine) = options.query_engine.take() {
return crate::query::executor::QueryExecutor::new(
self.pipeline.clone(),
self.link.clone(),
self.clone(),
query,
options,
query_engine,
Expand All @@ -727,4 +731,89 @@ impl ContainerClient {
|r| r.insert_headers(&partition_key),
)
}

/// Gets the partition key ranges for this container.
///
/// This is used internally to retrieve the partition key ranges, which are then
/// mapped to public `FeedRange` objects.
///
/// This method is used by the query executor to determine the partition key ranges, and is not intended for direct use by applications.
/// Applications should use the [`get_feed_ranges()`](ContainerClient::get_feed_ranges) method to retrieve the partition key ranges as `FeedRange` objects.
pub(crate) async fn get_partition_key_ranges(
&self,
context: Context<'_>,
) -> azure_core::Result<Vec<PartitionKeyRange>> {
/// Response model for partition key ranges from the Cosmos DB service.
#[derive(Deserialize, Serialize)]
struct PartitionKeyRangesResult {
#[serde(rename = "PartitionKeyRanges")]
pub ranges: Vec<PartitionKeyRange>,
}
let pkranges_link = self.link.feed(ResourceType::PartitionKeyRanges);
let url = self.pipeline.url(&pkranges_link);
let mut request = Request::new(url, Method::Get);

let response = self
.pipeline
.send::<PartitionKeyRangesResult>(context, &mut request, pkranges_link)
.await?
.into_body()
.await?;

Ok(response.ranges)
}

/// Gets the feed ranges for this container.
///
/// Feed ranges represent the partition key ranges for the container and can be used
/// to scope queries or change feed operations to specific subsets of data.
///
/// # Examples
///
/// ```rust,no_run
/// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
/// # let container_client: azure_data_cosmos::clients::ContainerClient = panic!("this is a non-running example");
/// let feed_ranges = container_client.get_feed_ranges().await?;
/// for range in feed_ranges {
/// println!("Range: {} to {}", range.min_inclusive, range.max_exclusive);
/// }
/// # Ok(())
/// # }
/// ```
pub async fn get_feed_ranges(
&self,
options: Option<FeedRangeOptions<'_>>,
) -> azure_core::Result<Vec<FeedRange>> {
let options = options.unwrap_or_default();
let context = options.method_options.context;
let partition_key_ranges = self.get_partition_key_ranges(context).await?;

let feed_ranges = partition_key_ranges
.into_iter()
.map(|pkrange| pkrange.range)
.collect();

Ok(feed_ranges)
}

#[tracing::instrument(skip_all)]
pub(crate) async fn get_query_plan(
&self,
context: Context<'_>,
query: &Query,
supported_features: &str,
) -> azure_core::Result<RawResponse> {
let url = self.pipeline.url(&self.items_link);
let mut request = pipeline::create_base_query_request(url, query)?;
request.insert_header(constants::QUERY_ENABLE_CROSS_PARTITION, "True");
request.insert_header(constants::IS_QUERY_PLAN_REQUEST, "True");
request.insert_header(
constants::SUPPORTED_QUERY_FEATURES,
supported_features.to_string(),
);

self.pipeline
.send_raw(context, &mut request, self.items_link.clone())
.await
}
}
34 changes: 34 additions & 0 deletions sdk/cosmos/azure_data_cosmos/src/models/feed_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

use serde::{Deserialize, Serialize};

/// Represents a feed range for a container.
///
/// A feed range represents a contiguous range of partition key values that can be used
/// to scope queries or change feed operations to a specific subset of data.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct FeedRange {
/// The minimum partition key value (inclusive) for this feed range.
pub min_inclusive: String,
/// The maximum partition key value (exclusive) for this feed range.
pub max_exclusive: String,
}

/// Represents a partition key range with its ID and feed range information.
///
/// This is used internally to represent the full partition key range information
/// returned by the Cosmos DB service.
///
/// Partition Key Ranges are only used by the query engine as part of executing queries.
/// Applications should use [`FeedRange`](crate::models::FeedRange) when specifying ranges for queries or change feed operations.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct PartitionKeyRange {
/// The ID of the partition key range.
pub id: String,
/// The feed range information for this partition key range.
#[serde(flatten)]
pub range: FeedRange,
}
2 changes: 2 additions & 0 deletions sdk/cosmos/azure_data_cosmos/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ use azure_core::{http::Etag, time::OffsetDateTime};
use serde::{Deserialize, Deserializer, Serialize};

mod container_properties;
mod feed_range;
mod indexing_policy;
mod partition_key_definition;
mod patch_operations;
mod throughput_properties;

pub use container_properties::*;
pub use feed_range::*;
pub use indexing_policy::*;
pub use partition_key_definition::*;
pub use patch_operations::*;
Expand Down
6 changes: 6 additions & 0 deletions sdk/cosmos/azure_data_cosmos/src/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,9 @@ pub struct ReadDatabaseOptions<'a> {
pub struct ThroughputOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
}

/// Options to be passed to [`ContainerClient::get_feed_ranges()`](crate::clients::ContainerClient::get_feed_ranges()).
#[derive(Clone, Default)]
pub struct FeedRangeOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
}
6 changes: 4 additions & 2 deletions sdk/cosmos/azure_data_cosmos/src/query/engine.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::models::PartitionKeyRange;

/// Represents a request from the query pipeline for data from a specific partition key range.
pub struct QueryRequest {
/// The ID of the partition key range to query.
Expand Down Expand Up @@ -53,7 +55,7 @@ pub trait QueryEngine {
/// ## Arguments
/// * `query` - The query to be executed.
/// * `plan` - The JSON-encoded query plan describing the query (usually provided by the gateway).
/// * `pkranges` - The JSON-encoded partition key ranges to be queried (usually provided by the gateway).
/// * `pkranges` - A slice of [`PartitionKeyRange`]s that the query should be executed against.
///
/// ## Shared Access
///
Expand All @@ -65,7 +67,7 @@ pub trait QueryEngine {
&self,
query: &str,
plan: &[u8],
pkranges: &[u8],
pkranges: &[PartitionKeyRange],
) -> azure_core::Result<OwnedQueryPipeline>;

/// Gets a comma-separates list of features supported by this query engine, suitable for use in the `x-ms-cosmos-supported-query-features` header when requesting a query plan.
Expand Down
101 changes: 28 additions & 73 deletions sdk/cosmos/azure_data_cosmos/src/query/executor.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use azure_core::http::{headers::Headers, Context, Method, RawResponse, Request};
use serde::de::DeserializeOwned;
use azure_core::http::{headers::Headers, Context, Method, RawResponse, Request, Response};
use serde::{de::DeserializeOwned, Deserialize, Serialize};

use crate::{
clients::ContainerClient,
constants,
models::PartitionKeyRange,
pipeline::{self, CosmosPipeline},
query::{OwnedQueryPipeline, QueryEngineRef, QueryResult},
resource_context::{ResourceLink, ResourceType},
FeedPage, FeedPager, Query, QueryOptions,
};

pub struct QueryExecutor<T: DeserializeOwned> {
http_pipeline: CosmosPipeline,
container_link: ResourceLink,
items_link: ResourceLink,
container_client: ContainerClient,
context: Context<'static>,
query_engine: QueryEngineRef,
base_request: Option<Request>,
Expand All @@ -29,18 +29,14 @@ pub struct QueryExecutor<T: DeserializeOwned> {

impl<T: DeserializeOwned + Send + 'static> QueryExecutor<T> {
pub fn new(
http_pipeline: CosmosPipeline,
container_link: ResourceLink,
container_client: ContainerClient,
query: Query,
options: QueryOptions<'_>,
query_engine: QueryEngineRef,
) -> azure_core::Result<Self> {
let items_link = container_link.feed(ResourceType::Items);
let context = options.method_options.context.into_owned();
Ok(Self {
http_pipeline,
container_link,
items_link,
container_client,
context,
query_engine,
base_request: None,
Expand Down Expand Up @@ -76,33 +72,30 @@ impl<T: DeserializeOwned + Send + 'static> QueryExecutor<T> {
),
None => {
// Initialize the pipeline.
let query_plan = get_query_plan(
&self.http_pipeline,
&self.items_link,
Context::with_context(&self.context),
&self.query,
self.query_engine.supported_features()?,
)
.await?
.into_body()
.collect()
.await?;
let pkranges = get_pkranges(
&self.http_pipeline,
&self.container_link,
Context::with_context(&self.context),
)
.await?
.into_body()
.collect()
.await?;
let query_plan = self
.container_client
.get_query_plan(
Context::with_context(&self.context),
&self.query,
self.query_engine.supported_features()?,
)
.await?
.into_body()
.collect()
.await?;
let pkranges = self
.container_client
.get_partition_key_ranges(Context::with_context(&self.context))
.await?;

let pipeline =
self.query_engine
.create_pipeline(&self.query.text, &query_plan, &pkranges)?;
self.query.text = pipeline.query().into();
self.base_request = Some(crate::pipeline::create_base_query_request(
self.http_pipeline.url(&self.items_link),
self.container_client
.pipeline
.url(&self.container_client.items_link),
&self.query,
)?);
self.pipeline = Some(pipeline);
Expand Down Expand Up @@ -143,11 +136,12 @@ impl<T: DeserializeOwned + Send + 'static> QueryExecutor<T> {
}

let resp = self
.http_pipeline
.container_client
.pipeline
.send_raw(
Context::with_context(&self.context),
&mut query_request,
self.items_link.clone(),
self.container_client.items_link.clone(),
)
.await?;

Expand All @@ -172,42 +166,3 @@ impl<T: DeserializeOwned + Send + 'static> QueryExecutor<T> {
Ok(None)
}
}

// This isn't an inherent method on QueryExecutor because that would force the whole executor to be Sync, which would force the pipeline to be Sync.
#[tracing::instrument(skip_all)]
async fn get_query_plan(
http_pipeline: &CosmosPipeline,
items_link: &ResourceLink,
context: Context<'_>,
query: &Query,
supported_features: &str,
) -> azure_core::Result<RawResponse> {
let url = http_pipeline.url(items_link);
let mut request = pipeline::create_base_query_request(url, query)?;
request.insert_header(constants::QUERY_ENABLE_CROSS_PARTITION, "True");
request.insert_header(constants::IS_QUERY_PLAN_REQUEST, "True");
request.insert_header(
constants::SUPPORTED_QUERY_FEATURES,
supported_features.to_string(),
);

http_pipeline
.send_raw(context, &mut request, items_link.clone())
.await
}

// This isn't an inherent method on QueryExecutor because that would force the whole executor to be Sync, which would force the pipeline to be Sync.
#[tracing::instrument(skip_all)]
async fn get_pkranges(
http_pipeline: &CosmosPipeline,
container_link: &ResourceLink,
context: Context<'_>,
) -> azure_core::Result<RawResponse> {
let pkranges_link = container_link.feed(ResourceType::PartitionKeyRanges);
let url = http_pipeline.url(&pkranges_link);
let mut base_request = Request::new(url, Method::Get);

http_pipeline
.send_raw(context, &mut base_request, pkranges_link)
.await
}
Loading
Loading