Skip to content

Commit e7de99a

Browse files
Moving to non-blocking UUIDs in product code (#45495)
* Moving to non-blocking UUIDs in product code * Update Utils.java * Fixing build break * Update CHANGELOG.md * Update CosmosDiagnosticsContext.java
1 parent 521b01d commit e7de99a

File tree

25 files changed

+96
-87
lines changed

25 files changed

+96
-87
lines changed

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.azure.cosmos.CosmosAsyncClient;
77
import com.azure.cosmos.CosmosAsyncContainer;
88
import com.azure.cosmos.CosmosAsyncDatabase;
9+
import com.azure.cosmos.implementation.UUIDs;
910
import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils;
1011
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
1112
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
@@ -31,7 +32,6 @@
3132
import java.util.ArrayList;
3233
import java.util.List;
3334
import java.util.Map;
34-
import java.util.UUID;
3535
import java.util.function.Function;
3636
import java.util.stream.Collectors;
3737

@@ -158,7 +158,7 @@ private String getThroughputControlClientMetadataCachesSnapshotString() {
158158

159159
private void readRandomItemFromContainer(CosmosAsyncContainer container) {
160160
if (container != null) {
161-
container.readItem(UUID.randomUUID().toString(), new PartitionKey(UUID.randomUUID().toString()), JsonNode.class)
161+
container.readItem(UUIDs.nonBlockingRandomUUID().toString(), new PartitionKey(UUIDs.nonBlockingRandomUUID().toString()), JsonNode.class)
162162
.onErrorResume(throwable -> {
163163
if (!KafkaCosmosExceptionsHelper.isNotFoundException(throwable)) {
164164
LOGGER.warn("Failed to read item from container {}", container.getId(), throwable);

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.azure.cosmos.CosmosAsyncContainer;
88
import com.azure.cosmos.CosmosAsyncDatabase;
99
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
10+
import com.azure.cosmos.implementation.UUIDs;
1011
import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils;
1112
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
1213
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
@@ -56,7 +57,6 @@
5657
import java.util.LinkedHashMap;
5758
import java.util.List;
5859
import java.util.Map;
59-
import java.util.UUID;
6060
import java.util.concurrent.ConcurrentHashMap;
6161
import java.util.function.Function;
6262
import java.util.stream.Collectors;
@@ -601,7 +601,7 @@ private String getThroughputControlClientMetadataCachesSnapshotString() {
601601

602602
private void readRandomItemFromContainer(CosmosAsyncContainer container) {
603603
if (container != null) {
604-
container.readItem(UUID.randomUUID().toString(), new PartitionKey(UUID.randomUUID().toString()), JsonNode.class)
604+
container.readItem(UUIDs.nonBlockingRandomUUID().toString(), new PartitionKey(UUIDs.nonBlockingRandomUUID().toString()), JsonNode.class)
605605
.onErrorResume(throwable -> {
606606
if (!KafkaCosmosExceptionsHelper.isNotFoundException(throwable)) {
607607
LOGGER.warn("Failed to read item from container {}", container.getId(), throwable);

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5-
import com.azure.cosmos.implementation.SparkBridgeImplementationInternal
5+
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, UUIDs}
66
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
77
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
88
import org.apache.spark.broadcast.Broadcast
@@ -29,7 +29,7 @@ private class ChangeFeedMicroBatchStream
2929

3030
@transient private lazy val log = LoggerHelper.getLogger(diagnosticsConfig, this.getClass)
3131

32-
private val correlationActivityId = UUID.randomUUID()
32+
private val correlationActivityId = UUIDs.nonBlockingRandomUUID()
3333
private val streamId = correlationActivityId.toString
3434
log.logTrace(s"Instantiated ${this.getClass.getSimpleName}.$streamId")
3535

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5-
import com.azure.cosmos.implementation.SparkBridgeImplementationInternal
5+
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, UUIDs}
66
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
77
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
88
import org.apache.spark.broadcast.Broadcast
@@ -29,7 +29,7 @@ private class ChangeFeedMicroBatchStream
2929

3030
@transient private lazy val log = LoggerHelper.getLogger(diagnosticsConfig, this.getClass)
3131

32-
private val correlationActivityId = UUID.randomUUID()
32+
private val correlationActivityId = UUIDs.nonBlockingRandomUUID()
3333
private val streamId = correlationActivityId.toString
3434
log.logTrace(s"Instantiated ${this.getClass.getSimpleName}.$streamId")
3535

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
package com.azure.cosmos.spark
44

55
// scalastyle:off underscore.import
6-
import com.azure.cosmos.implementation.CosmosDaemonThreadFactory
6+
import com.azure.cosmos.implementation.{CosmosDaemonThreadFactory, UUIDs}
77
import com.azure.cosmos.{BridgeInternal, CosmosAsyncContainer, CosmosDiagnosticsContext, CosmosEndToEndOperationLatencyPolicyConfigBuilder, CosmosException}
88
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils
99
import com.azure.cosmos.implementation.batch.{BatchRequestResponseConstants, BulkExecutorDiagnosticsTracker, ItemBulkOperation}
@@ -193,7 +193,7 @@ private class BulkWriter
193193
val executor = new ScheduledThreadPoolExecutor(
194194
1,
195195
new CosmosDaemonThreadFactory(
196-
"BulkWriterReadManyFlush" + UUID.randomUUID()
196+
"BulkWriterReadManyFlush" + UUIDs.nonBlockingRandomUUID()
197197
))
198198
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
199199
executor.setRemoveOnCancelPolicy(true)
@@ -212,7 +212,7 @@ private class BulkWriter
212212
private def initializeOperationContext(): SparkTaskContext = {
213213
val taskContext = TaskContext.get
214214

215-
val diagnosticsContext: DiagnosticsContext = DiagnosticsContext(UUID.randomUUID(), "BulkWriter")
215+
val diagnosticsContext: DiagnosticsContext = DiagnosticsContext(UUIDs.nonBlockingRandomUUID(), "BulkWriter")
216216

217217
if (taskContext != null) {
218218
val taskDiagnosticsContext = SparkTaskContext(diagnosticsContext.correlationActivityId,

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
package com.azure.cosmos.spark
44

55
import com.azure.cosmos.SparkBridgeInternal
6-
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, Strings}
6+
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, Strings, UUIDs}
77
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty}
88
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
99
import org.apache.spark.broadcast.Broadcast
@@ -26,7 +26,7 @@ private class ChangeFeedBatch
2626

2727
@transient private lazy val log = LoggerHelper.getLogger(diagnosticsConfig, this.getClass)
2828

29-
private val correlationActivityId = UUID.randomUUID()
29+
private val correlationActivityId = UUIDs.nonBlockingRandomUUID()
3030
private val batchId = correlationActivityId.toString
3131
log.logTrace(s"Instantiated ${this.getClass.getSimpleName}")
3232
private val defaultParallelism = session.sparkContext.defaultParallelism

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
package com.azure.cosmos.spark
44

55
import com.azure.cosmos.CosmosException
6-
import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot
6+
import com.azure.cosmos.implementation.{CosmosClientMetadataCachesSnapshot, UUIDs}
77
import com.azure.cosmos.models.PartitionKey
88
import com.azure.cosmos.spark.diagnostics.LoggerHelper
99
import com.fasterxml.jackson.databind.node.ObjectNode
@@ -162,7 +162,7 @@ private class ChangeFeedTable(val session: SparkSession,
162162

163163
try {
164164
container.readItem(
165-
UUID.randomUUID().toString, new PartitionKey(UUID.randomUUID().toString), classOf[ObjectNode])
165+
UUIDs.nonBlockingRandomUUID().toString, new PartitionKey(UUIDs.nonBlockingRandomUUID().toString), classOf[ObjectNode])
166166
.block()
167167
} catch {
168168
case _: CosmosException =>

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosReadManyReader.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
package com.azure.cosmos.spark
44

55
import com.azure.cosmos.{CosmosException, ReadConsistencyStrategy}
6-
import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot
6+
import com.azure.cosmos.implementation.{CosmosClientMetadataCachesSnapshot, UUIDs}
77
import com.azure.cosmos.models.{CosmosItemIdentity, PartitionKey}
88
import com.azure.cosmos.spark.CosmosPredicates.assertOnSparkDriver
99
import com.azure.cosmos.spark.diagnostics.{BasicLoggingTrait, DiagnosticsContext}
@@ -66,8 +66,8 @@ private[spark] class CosmosReadManyReader(
6666
clientCacheItems(1))
6767
try {
6868
container.readItem(
69-
UUID.randomUUID().toString,
70-
new PartitionKey(UUID.randomUUID().toString),
69+
UUIDs.nonBlockingRandomUUID().toString,
70+
new PartitionKey(UUIDs.nonBlockingRandomUUID().toString),
7171
classOf[ObjectNode])
7272
.block()
7373
} catch {
@@ -89,7 +89,7 @@ private[spark] class CosmosReadManyReader(
8989
}
9090

9191
def readMany(inputRdd: RDD[Row], identityExtraction: Row => CosmosItemIdentity): DataFrame = {
92-
val correlationActivityId = UUID.randomUUID()
92+
val correlationActivityId = UUIDs.nonBlockingRandomUUID()
9393
val calledFrom = s"CosmosReadManyReader.readMany($correlationActivityId)"
9494
val schema = Loan(
9595
List[Option[CosmosClientCacheItem]](

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsReadOnlyTable.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
package com.azure.cosmos.spark
44

55
import com.azure.cosmos.CosmosException
6-
import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot
6+
import com.azure.cosmos.implementation.{CosmosClientMetadataCachesSnapshot, UUIDs}
77
import com.azure.cosmos.models.PartitionKey
88
import com.azure.cosmos.spark.CosmosTableSchemaInferrer.{IdAttributeName, RawJsonBodyAttributeName, TimestampAttributeName}
99
import com.azure.cosmos.spark.diagnostics.LoggerHelper
@@ -163,8 +163,8 @@ private[spark] class ItemsReadOnlyTable(val sparkSession: SparkSession,
163163
clientCacheItems(1))
164164
try {
165165
container.readItem(
166-
UUID.randomUUID().toString,
167-
new PartitionKey(UUID.randomUUID().toString),
166+
UUIDs.nonBlockingRandomUUID().toString,
167+
new PartitionKey(UUIDs.nonBlockingRandomUUID().toString),
168168
classOf[ObjectNode])
169169
.block()
170170
} catch {

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsScanBase.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5-
import com.azure.cosmos.implementation.SparkBridgeImplementationInternal
5+
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, UUIDs}
66
import com.azure.cosmos.models.{PartitionKeyDefinition, SqlParameter, SqlQuerySpec}
77
import com.azure.cosmos.spark.CosmosPredicates.requireNotNull
88
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
@@ -189,7 +189,7 @@ private[spark] abstract class ItemsScanBase(session: SparkSession,
189189
}
190190

191191
override def createReaderFactory(): PartitionReaderFactory = {
192-
val correlationActivityId = UUID.randomUUID()
192+
val correlationActivityId = UUIDs.nonBlockingRandomUUID()
193193
log.logInfo(s"Creating ItemsScan with CorrelationActivityId '${correlationActivityId.toString}' for query '${cosmosQuery.queryText}'")
194194
ItemsScanPartitionReaderFactory(
195195
config,

0 commit comments

Comments
 (0)