Skip to content

Commit 765e14a

Browse files
Allow usage in non-public clouds (Azure#45310)
* Allow usage in non-public clouds * Changelogs * Update sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update ClientMetricsTest.java --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent a63036a commit 765e14a

File tree

9 files changed

+154
-14
lines changed

9 files changed

+154
-14
lines changed

sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
### 4.37.2 (2025-05-13)
44

5+
#### Features Added
6+
* Added option to use the connector in non-public Azure clouds. - See [PR 45310](https://github.com/Azure/azure-sdk-for-java/pull/45310)
7+
58
#### Bugs Fixed
69
* Fixed an issue during bulk write operations that could result in failing the Spark job in `BulkWriter.flushAndClose` too eagerly in certain cases. - See [PR 44992](https://github.com/Azure/azure-sdk-for-java/pull/44992)
710
* Fixed hang issue in `CosmosPagedIterable#handle` by preventing race conditions in underlying subscription of `Flux<FeedResponse>`. - [PR 45290](https://github.com/Azure/azure-sdk-for-java/pull/45290)

sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
### 4.37.2 (2025-05-13)
44

5+
#### Features Added
6+
* Added option to use the connector in non-public Azure clouds. - See [PR 45310](https://github.com/Azure/azure-sdk-for-java/pull/45310)
7+
58
#### Bugs Fixed
69
* Fixed an issue during bulk write operations that could result in failing the Spark job in `BulkWriter.flushAndClose` too eagerly in certain cases. - See [PR 44992](https://github.com/Azure/azure-sdk-for-java/pull/44992)
710
* Fixed hang issue in `CosmosPagedIterable#handle` by preventing race conditions in underlying subscription of `Flux<FeedResponse>`. - [PR 45290](https://github.com/Azure/azure-sdk-for-java/pull/45290)

sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
### 4.37.2 (2025-05-13)
44

5+
#### Features Added
6+
* Added option to use the connector in non-public Azure clouds. - See [PR 45310](https://github.com/Azure/azure-sdk-for-java/pull/45310)
7+
58
#### Bugs Fixed
69
* Fixed an issue during bulk write operations that could result in failing the Spark job in `BulkWriter.flushAndClose` too eagerly in certain cases. - See [PR 44992](https://github.com/Azure/azure-sdk-for-java/pull/44992)
710
* Fixed hang issue in `CosmosPagedIterable#handle` by preventing race conditions in underlying subscription of `Flux<FeedResponse>`. - [PR 45290](https://github.com/Azure/azure-sdk-for-java/pull/45290)

sdk/cosmos/azure-cosmos-spark_3_2-12/docs/AAD-Auth.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ To enable managed identity support out-of-the-box, the Spark environment needs t
3333
| `spark.cosmos.account.tenantId` | None | The `AAD TenantId` of the Azure Cosmos DB account resource specified under `spark.cosmos.accountEndpoint`. This parameter is required for all management operations when using AAD / Microsoft Entra ID authentication. |
3434
| `spark.cosmos.account.resourceGroupName` | None | The simple resource group name (not the full qualified one) of the Azure Cosmos DB account resource specified under `spark.cosmos.accountEndpoint`. This parameter is required for all management operations when using AAD / Microsoft Entra ID authentication. |
3535

36+
#### Non-public clouds
37+
For non-public clouds the `spark.cosmos.account.azureEnvironment` config value need to be set to `Custom`and the config entries `spark.cosmos.account.azureEnvironment.management` and `spark.cosmos.account.azureEnvironment.aad` have to be specified to the correct values for the non-public cloud.
38+
39+
| Config Property Name | Default | Description |
40+
|:---------------------------------------------------|:--------|:----------------------------------------------------------------------------------------------------------------------------------------------------------|
41+
| `spark.cosmos.account.azureEnvironment.management` | None | The Uri of the ARM (Resource Manager) endpoint in the custom cloud - e.g. the corresponding value to `https://management.azure.com/` in the public cloud. |
42+
| `spark.cosmos.account.azureEnvironment.aad` | None | The Uri of the AAD endpoint in the custom cloud - e.g. the corresponding value to `https://login.microsoftonline.com/` in the public cloud. |
3643

3744
#### Environment variables or system properties
3845

sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@
22

33

44
## Generic Configuration
5-
| Config Property Name | Default | Description |
6-
|:-----------------------------------------|:---------|:------------------------------------------------------------------------------------------------------------|
7-
| `spark.cosmos.accountEndpoint` | None | Cosmos DB Account Endpoint Uri |
8-
| `spark.cosmos.accountKey` | None | Cosmos DB Account Key |
9-
| `spark.cosmos.database` | None | Cosmos DB database name |
10-
| `spark.cosmos.container` | None | Cosmos DB container name |
11-
| `spark.cosmos.account.subscriptionId` | None | The subscriptionId of the Cosmos DB account. Required for `ServicePrincipal` authentication. |
12-
| `spark.cosmos.account.tenantId` | None | The tenantId of the Cosmos DB account. Required for `ServicePrincipal` authentication. |
13-
| `spark.cosmos.account.resourceGroupName` | None | The resource group of the Cosmos DB account. Required for `ServicePrincipal` authentication. |
14-
| `spark.cosmos.account.azureEnvironment` | `Azure` | The azure environment of the Cosmos DB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`. |
5+
| Config Property Name | Default | Description |
6+
|:---------------------------------------------------|:--------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
7+
| `spark.cosmos.accountEndpoint` | None | Cosmos DB Account Endpoint Uri |
8+
| `spark.cosmos.accountKey` | None | Cosmos DB Account Key |
9+
| `spark.cosmos.database` | None | Cosmos DB database name |
10+
| `spark.cosmos.container` | None | Cosmos DB container name |
11+
| `spark.cosmos.account.subscriptionId` | None | The subscriptionId of the Cosmos DB account. Required for `ServicePrincipal` authentication. |
12+
| `spark.cosmos.account.tenantId` | None | The tenantId of the Cosmos DB account. Required for `ServicePrincipal` authentication. |
13+
| `spark.cosmos.account.resourceGroupName` | None | The resource group of the Cosmos DB account. Required for `ServicePrincipal` authentication. |
14+
| `spark.cosmos.account.azureEnvironment` | `Azure` | The azure environment of the Cosmos DB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany` or `Custom` - when using `Custom` (only needed for non-public clouds) the config entries `spark.cosmos.account.azureEnvironment.management` and `spark.cosmos.account.azureEnvironment.aad` have to also be specified. |
15+
| `spark.cosmos.account.azureEnvironment.management` | None | The Uri of the ARM (Resource Manager) endpoint in the custom cloud - e.g. the corresponding value to `https://management.azure.com/` in the public cloud. |
16+
| `spark.cosmos.account.azureEnvironment.aad` | None | The Uri of the AAD endpoint in the custom cloud - e.g. the corresponding value to `https://login.microsoftonline.com/` in the public cloud. |
1517

1618
### AAD Auth Config
1719
| Config Property Name | Default | Description |

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.types.{DataType, NumericType, StructType}
3131
import java.net.{URI, URISyntaxException, URL}
3232
import java.time.format.DateTimeFormatter
3333
import java.time.{Duration, Instant}
34+
import java.util
3435
import java.util.{Locale, ServiceLoader}
3536
import scala.collection.concurrent.TrieMap
3637
import scala.collection.immutable.{HashSet, List, Map}
@@ -48,6 +49,8 @@ private[spark] object CosmosConfigNames {
4849
val TenantId = "spark.cosmos.account.tenantId"
4950
val ResourceGroupName = "spark.cosmos.account.resourceGroupName"
5051
val AzureEnvironment = "spark.cosmos.account.azureEnvironment"
52+
val AzureEnvironmentAAD = "spark.cosmos.account.azureEnvironment.aad"
53+
val AzureEnvironmentManagement = "spark.cosmos.account.azureEnvironment.management"
5154
val AuthType = "spark.cosmos.auth.type"
5255
val ClientId = "spark.cosmos.auth.aad.clientId"
5356
val ResourceId = "spark.cosmos.auth.aad.resourceId"
@@ -160,6 +163,8 @@ private[spark] object CosmosConfigNames {
160163
ClientCertPemBase64,
161164
ClientCertSendChain,
162165
AzureEnvironment,
166+
AzureEnvironmentAAD,
167+
AzureEnvironmentManagement,
163168
Database,
164169
Container,
165170
PreferredRegionsList,
@@ -615,6 +620,18 @@ private object CosmosAccountConfig extends BasicLoggingTrait {
615620
parseFromStringFunction = resourceGroupName => resourceGroupName,
616621
helpMessage = "The resource group of the CosmosDB account. Required for `ServicePrincipal` authentication.")
617622

623+
private val AzureEnvironmentManagementUri = CosmosConfigEntry[String](key = CosmosConfigNames.AzureEnvironmentManagement,
624+
defaultValue = None,
625+
mandatory = false,
626+
parseFromStringFunction = managementUri => managementUri,
627+
helpMessage = "The ARM management endpoint to be used when selecting AzureEnvironment `Custom`.")
628+
629+
private val AzureEnvironmentAadUri = CosmosConfigEntry[String](key = CosmosConfigNames.AzureEnvironmentAAD,
630+
defaultValue = None,
631+
mandatory = false,
632+
parseFromStringFunction = aadUri => aadUri,
633+
helpMessage = "The AAD endpoint to be used when selecting AzureEnvironment `Custom`.")
634+
618635
private val AzureEnvironmentTypeEnum = CosmosConfigEntry[java.util.Map[String, String]](key = CosmosConfigNames.AzureEnvironment,
619636
defaultValue = Option.apply(AzureEnvironment.AZURE.getEndpoints),
620637
mandatory = false,
@@ -671,7 +688,6 @@ private object CosmosAccountConfig extends BasicLoggingTrait {
671688
val subscriptionIdOpt = CosmosConfigEntry.parse(cfg, SubscriptionId)
672689
val resourceGroupNameOpt = CosmosConfigEntry.parse(cfg, ResourceGroupName)
673690
val tenantIdOpt = CosmosConfigEntry.parse(cfg, TenantId)
674-
val azureEnvironmentOpt = CosmosConfigEntry.parse(cfg, AzureEnvironmentTypeEnum)
675691
val clientBuilderInterceptors = CosmosConfigEntry.parse(cfg, ClientBuilderInterceptors)
676692
val clientInterceptors = CosmosConfigEntry.parse(cfg, ClientInterceptors)
677693

@@ -683,6 +699,34 @@ private object CosmosAccountConfig extends BasicLoggingTrait {
683699
SparkBridgeImplementationInternal.configureSimpleObjectMapper(true)
684700
}
685701

702+
val azureEnvironmentOpt : Option[util.Map[String, String]] = if (cfg.exists(kvp =>
703+
CosmosConfigNames.AzureEnvironment.equalsIgnoreCase(kvp._1)
704+
&& "Custom".equalsIgnoreCase(kvp._2))) {
705+
706+
val endpoints: util.Map[String, String] = new util.HashMap[String, String]()
707+
val mgmtEndpoint = CosmosConfigEntry.parse(cfg, AzureEnvironmentManagementUri)
708+
if (mgmtEndpoint.isDefined) {
709+
endpoints.put("resourceManagerEndpointUrl", mgmtEndpoint.get)
710+
} else {
711+
throw new IllegalArgumentException(
712+
s"The configuration '${CosmosConfigNames.AzureEnvironmentManagement}' is required when "
713+
+ "choosing AzureEnvironment 'Custom'.")
714+
}
715+
716+
val aadEndpoint = CosmosConfigEntry.parse(cfg, AzureEnvironmentAadUri)
717+
if (aadEndpoint.isDefined) {
718+
endpoints.put("activeDirectoryEndpointUrl", aadEndpoint.get)
719+
} else {
720+
throw new IllegalArgumentException(
721+
s"The configuration '${CosmosConfigNames.AzureEnvironmentAAD}' is required when "
722+
+ "choosing AzureEnvironment 'Custom'.")
723+
}
724+
725+
Option.apply(endpoints)
726+
} else {
727+
CosmosConfigEntry.parse(cfg, AzureEnvironmentTypeEnum)
728+
}
729+
686730
// parsing above already validated these assertions
687731
assert(endpointOpt.isDefined, s"Parameter '${CosmosConfigNames.AccountEndpoint}' (Uri) is missing.")
688732
assert(accountName.isDefined, s"Parameter '${CosmosConfigNames.AccountEndpoint}' is missing.")

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,61 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait {
227227
}
228228
}
229229

230+
"Config Parser" should "parse custom azure environment" in {
231+
232+
for (authType <- Array("ServicePrinciple", "ServicePrincipal")) {
233+
val userConfig = Map(
234+
"spark.cosmos.accountEndpoint" -> "https://boson-test.documents.azure.com:443/",
235+
"spark.cosmos.auth.type" -> authType,
236+
"spark.cosmos.account.subscriptionId" -> testAccountSubscriptionId,
237+
"spark.cosmos.account.tenantId" -> testAccountTenantId,
238+
"spark.cosmos.account.resourceGroupName" -> testAccountResourceGroupName,
239+
"spark.cosmos.auth.aad.clientId" -> testServicePrincipalClientId,
240+
"spark.cosmos.auth.aad.clientSecret" -> testServicePrincipalClientSecret,
241+
"spark.cosmos.account.azureEnvironment" -> "CuSTom",
242+
"spark.cosmos.account.azureEnvironment.AaD" -> "CustomAadEndpoint",
243+
"spark.cosmos.account.azureEnvironment.mANagement" -> "CustomARMEndpoint"
244+
)
245+
246+
val userCfgMissingAadEndpoint = userConfig.toMap.filter { case (key, _) => key != "spark.cosmos.account.azureEnvironment.AaD" }
247+
try {
248+
CosmosAccountConfig.parseCosmosAccountConfig(userCfgMissingAadEndpoint)
249+
throw new IllegalStateException("Should never reach here when AAD endpoint config is missing")
250+
} catch {
251+
case _: IllegalArgumentException =>
252+
case otherError: Throwable => throw otherError
253+
}
254+
255+
val userCfgMissingArmEndpoint = userConfig.toMap.filterKeys(_ != "spark.cosmos.account.azureEnvironment.mANagement")
256+
try {
257+
CosmosAccountConfig.parseCosmosAccountConfig(userCfgMissingArmEndpoint)
258+
throw new IllegalStateException("Should never reach here when ARM endpoint config is missing")
259+
} catch {
260+
case _: IllegalArgumentException =>
261+
case otherError: Throwable => throw otherError
262+
}
263+
264+
val endpointConfig = CosmosAccountConfig.parseCosmosAccountConfig(userConfig)
265+
266+
endpointConfig.endpoint shouldEqual sampleProdEndpoint
267+
268+
val servicePrincipalAuthConfig = endpointConfig.authConfig.asInstanceOf[CosmosServicePrincipalAuthConfig]
269+
endpointConfig.subscriptionId.get shouldEqual testAccountSubscriptionId
270+
servicePrincipalAuthConfig.tenantId shouldEqual testAccountTenantId
271+
endpointConfig.resourceGroupName.get shouldEqual testAccountResourceGroupName
272+
servicePrincipalAuthConfig.clientId shouldEqual testServicePrincipalClientId
273+
servicePrincipalAuthConfig.clientSecret.isDefined shouldEqual true
274+
servicePrincipalAuthConfig.clientSecret.get shouldEqual testServicePrincipalClientSecret
275+
servicePrincipalAuthConfig.clientCertPemBase64.isDefined shouldEqual false
276+
servicePrincipalAuthConfig.sendChain shouldEqual false
277+
endpointConfig.accountName shouldEqual "boson-test"
278+
endpointConfig.azureEnvironmentEndpoints should not be null
279+
endpointConfig.azureEnvironmentEndpoints.size() shouldEqual 2
280+
endpointConfig.azureEnvironmentEndpoints.get("activeDirectoryEndpointUrl") shouldEqual "CustomAadEndpoint"
281+
endpointConfig.azureEnvironmentEndpoints.get("resourceManagerEndpointUrl") shouldEqual "CustomARMEndpoint"
282+
}
283+
}
284+
230285
it should "validate account endpoint" in {
231286
val userConfig = Map(
232287
"spark.cosmos.accountEndpoint" -> "invalidUrl",

sdk/cosmos/azure-cosmos-spark_3_2-12/test-databricks/notebooks/basicScenarioAad.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ val cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
2323
"spark.cosmos.account.subscriptionId" -> subscriptionId,
2424
"spark.cosmos.account.tenantId" -> tenantId,
2525
"spark.cosmos.account.resourceGroupName" -> resourceGroupName,
26+
"spark.cosmos.account.azureEnvironment" -> "Custom",
27+
"spark.cosmos.account.azureEnvironment.management" -> "https://management.azure.com/",
28+
"spark.cosmos.account.azureEnvironment.aad" -> "https://login.microsoftonline.com/",
2629
"spark.cosmos.auth.aad.clientId" -> clientId,
2730
"spark.cosmos.auth.aad.clientSecret" -> clientSecret,
2831
"spark.cosmos.database" -> cosmosDatabaseName,
@@ -40,7 +43,10 @@ val cfgWithAutoSchemaInference = Map("spark.cosmos.accountEndpoint" -> cosmosEnd
4043
"spark.cosmos.database" -> cosmosDatabaseName,
4144
"spark.cosmos.container" -> cosmosContainerName,
4245
"spark.cosmos.read.inferSchema.enabled" -> "true",
43-
"spark.cosmos.enforceNativeTransport" -> "true"
46+
"spark.cosmos.enforceNativeTransport" -> "true",
47+
"spark.cosmos.account.azureEnvironment" -> "Custom",
48+
"spark.cosmos.account.azureEnvironment.management" -> "https://management.azure.com/",
49+
"spark.cosmos.account.azureEnvironment.aad" -> "https://login.microsoftonline.com/"
4450
)
4551

4652
// COMMAND ----------

0 commit comments

Comments
 (0)