Skip to content

Commit 7d272cb

Browse files
committed
RUM-10140: Add BackPressured Dump information into Telemetry log
1 parent 5376af1 commit 7d272cb

File tree

11 files changed

+317
-12
lines changed

11 files changed

+317
-12
lines changed

dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressuredBlockingQueue.kt

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,17 @@ package com.datadog.android.core.internal.thread
99
import com.datadog.android.api.InternalLogger
1010
import com.datadog.android.core.configuration.BackPressureMitigation
1111
import com.datadog.android.core.configuration.BackPressureStrategy
12-
import java.util.concurrent.LinkedBlockingQueue
12+
import com.datadog.android.internal.thread.NamedRunnable
1313
import java.util.concurrent.TimeUnit
1414

1515
internal class BackPressuredBlockingQueue<E : Any>(
1616
private val logger: InternalLogger,
1717
private val executorContext: String,
1818
private val backPressureStrategy: BackPressureStrategy
19-
) : LinkedBlockingQueue<E>(
19+
) : ObservableBlockingQueue<E>(
2020
backPressureStrategy.capacity
2121
) {
22+
2223
override fun offer(e: E): Boolean {
2324
return addWithBackPressure(e) {
2425
@Suppress("UnsafeThirdPartyFunctionCall") // can't have NPE here
@@ -66,6 +67,13 @@ internal class BackPressuredBlockingQueue<E : Any>(
6667
}
6768

6869
private fun onThresholdReached() {
70+
val dump = dumpQueue()
71+
val backPressureMap = buildMap {
72+
put("capacity", backPressureStrategy.capacity)
73+
if (!dump.isNullOrEmpty()) {
74+
put("dump", dump)
75+
}
76+
}
6977
backPressureStrategy.onThresholdReached()
7078
logger.log(
7179
level = InternalLogger.Level.WARN,
@@ -74,20 +82,20 @@ internal class BackPressuredBlockingQueue<E : Any>(
7482
throwable = null,
7583
onlyOnce = false,
7684
additionalProperties = mapOf(
77-
"backpressure.capacity" to backPressureStrategy.capacity,
85+
"backpressure" to backPressureMap,
7886
"executor.context" to executorContext
7987
)
8088
)
8189
}
8290

8391
private fun onItemDropped(item: E) {
8492
backPressureStrategy.onItemDropped(item)
85-
93+
val name = (item as? NamedRunnable)?.sanitizedName ?: item.toString()
8694
// Note, do not send this to telemetry as it might cause a stack overflow
8795
logger.log(
8896
level = InternalLogger.Level.ERROR,
8997
target = InternalLogger.Target.MAINTAINER,
90-
messageBuilder = { "Dropped item in BackPressuredBlockingQueue queue: $item" },
98+
messageBuilder = { "Dropped item in BackPressuredBlockingQueue queue: $name" },
9199
throwable = null,
92100
onlyOnce = false,
93101
additionalProperties = mapOf(
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
3+
* This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
* Copyright 2016-Present Datadog, Inc.
5+
*/
6+
7+
package com.datadog.android.core.internal.thread
8+
9+
import com.datadog.android.internal.thread.NamedRunnable
10+
import java.util.concurrent.LinkedBlockingQueue
11+
import java.util.concurrent.TimeUnit
12+
import java.util.concurrent.atomic.AtomicLong
13+
14+
internal open class ObservableBlockingQueue<E : Any>(
15+
capacity: Int,
16+
private val currentTimeProvider: () -> Long = { System.currentTimeMillis() }
17+
) : LinkedBlockingQueue<E>(capacity) {
18+
19+
private var lastDumpTimestamp: AtomicLong = AtomicLong(0)
20+
21+
fun dumpQueue(): Map<String, Int>? {
22+
val currentTime = currentTimeProvider.invoke()
23+
val last = lastDumpTimestamp.get()
24+
val timeSinceLastBump = currentTime - last
25+
return if (timeSinceLastBump > DUMPING_TIME_INTERVAL_IN_MS) {
26+
if (lastDumpTimestamp.compareAndSet(last, currentTime)) {
27+
buildDumpMap()
28+
} else {
29+
null
30+
}
31+
} else {
32+
null
33+
}
34+
}
35+
36+
private fun buildDumpMap(): Map<String, Int> {
37+
val map = mutableMapOf<String, Int>()
38+
super.toArray().forEach { runnable ->
39+
(runnable as? NamedRunnable)?.sanitizedName?.let {
40+
map.put(it, (map[it] ?: 0) + 1)
41+
}
42+
}
43+
return map
44+
}
45+
46+
companion object {
47+
private val DUMPING_TIME_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(5)
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
3+
* This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
* Copyright 2016-Present Datadog, Inc.
5+
*/
6+
7+
package com.datadog.android.core.internal.thread
8+
9+
import com.datadog.android.internal.thread.NamedRunnable
10+
import com.datadog.android.utils.forge.Configurator
11+
import fr.xgouchet.elmyr.Forge
12+
import fr.xgouchet.elmyr.junit5.ForgeConfiguration
13+
import fr.xgouchet.elmyr.junit5.ForgeExtension
14+
import org.assertj.core.api.Assertions.assertThat
15+
import org.junit.jupiter.api.Test
16+
import org.junit.jupiter.api.extension.ExtendWith
17+
import org.junit.jupiter.api.extension.Extensions
18+
import org.mockito.Mockito.mock
19+
import org.mockito.junit.jupiter.MockitoExtension
20+
import org.mockito.junit.jupiter.MockitoSettings
21+
import org.mockito.quality.Strictness
22+
23+
@Extensions(
24+
ExtendWith(MockitoExtension::class),
25+
ExtendWith(ForgeExtension::class)
26+
)
27+
@MockitoSettings(strictness = Strictness.LENIENT)
28+
@ForgeConfiguration(Configurator::class)
29+
class ObservableBlockingQueueTest {
30+
31+
@Test
32+
fun `M return only once non-null map W try to dump multiple times in dump interval`(
33+
forge: Forge
34+
) {
35+
// Given
36+
val fakeItemCount = forge.aSmallInt()
37+
val fakeFirstTimestamp = forge.aLong(min = 10000)
38+
val fakeSecondTimestamp = fakeFirstTimestamp + forge.aLong(max = 1000)
39+
val fakeThirdTimestamp = fakeSecondTimestamp + forge.aLong(max = 1000)
40+
val fakeTimestamps =
41+
listOf(fakeFirstTimestamp, fakeSecondTimestamp, fakeThirdTimestamp).iterator()
42+
val fakeTimeProvider: () -> Long = { fakeTimestamps.next() }
43+
val testedObservableBlockingQueue =
44+
ObservableBlockingQueue<Any>(fakeItemCount + 1, fakeTimeProvider)
45+
repeat(fakeItemCount) {
46+
val mockItem = mock<Any>()
47+
testedObservableBlockingQueue.offer(mockItem)
48+
}
49+
50+
// When
51+
val firstMap = testedObservableBlockingQueue.dumpQueue()
52+
val secondMap = testedObservableBlockingQueue.dumpQueue()
53+
val thirdMap = testedObservableBlockingQueue.dumpQueue()
54+
55+
// Then
56+
assertThat(firstMap).isNotNull
57+
assertThat(secondMap).isNullOrEmpty()
58+
assertThat(thirdMap).isNullOrEmpty()
59+
}
60+
61+
@Test
62+
fun `M return only twice non-null map W try to dump twice times over dump interval`(
63+
forge: Forge
64+
) {
65+
// Given
66+
val fakeItemCount = forge.aSmallInt()
67+
val fakeFirstTimestamp = forge.aLong(min = 10000)
68+
val fakeSecondTimestamp = fakeFirstTimestamp + forge.aLong(min = 5000)
69+
val fakeTimestamps = listOf(fakeFirstTimestamp, fakeSecondTimestamp).iterator()
70+
val fakeTimeProvider: () -> Long = { fakeTimestamps.next() }
71+
val testedObservableBlockingQueue =
72+
ObservableBlockingQueue<Any>(fakeItemCount + 1, fakeTimeProvider)
73+
repeat(fakeItemCount) {
74+
val mockItem = mock<Any>()
75+
testedObservableBlockingQueue.offer(mockItem)
76+
}
77+
78+
// When
79+
val firstMap = testedObservableBlockingQueue.dumpQueue()
80+
val secondMap = testedObservableBlockingQueue.dumpQueue()
81+
82+
// Then
83+
assertThat(firstMap).isNotNull
84+
assertThat(secondMap).isNotNull
85+
}
86+
87+
@Test
88+
fun `M build correct map W try to dump named runnable`(
89+
forge: Forge
90+
) {
91+
// Given
92+
val expectedMap = mutableMapOf<String, Int>()
93+
val fakeTimeProvider: () -> Long = { forge.aLong(min = 10000L) }
94+
val fakeRunnableCount = forge.anInt(min = 5, max = 100)
95+
val testedObservableBlockingQueue =
96+
ObservableBlockingQueue<Runnable>(fakeRunnableCount + 1, fakeTimeProvider)
97+
val fakeRunnableTypeCount = forge.anInt(min = 1, max = fakeRunnableCount)
98+
val fakeRunnableTypes = mutableListOf<String>()
99+
repeat(fakeRunnableTypeCount) {
100+
fakeRunnableTypes.add(forge.anAlphabeticalString())
101+
}
102+
repeat(fakeRunnableCount) {
103+
val fakeName = forge.anElementFrom(fakeRunnableTypes)
104+
val fakeNamedRunnable = NamedRunnable(fakeName, mock<Runnable>())
105+
testedObservableBlockingQueue.offer(fakeNamedRunnable)
106+
expectedMap[fakeName] = (expectedMap[fakeName] ?: 0) + 1
107+
}
108+
109+
// When
110+
val map = testedObservableBlockingQueue.dumpQueue()
111+
112+
// Then
113+
assertThat(map).isEqualTo(expectedMap)
114+
}
115+
}

dd-sdk-android-internal/api/apiSurface

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ enum com.datadog.android.internal.telemetry.TracingHeaderType
5353
- TRACECONTEXT
5454
data class com.datadog.android.internal.telemetry.TracingHeaderTypesSet
5555
constructor(Set<TracingHeaderType>)
56+
class com.datadog.android.internal.thread.NamedRunnable : Runnable
57+
constructor(String, Runnable)
5658
fun ByteArray.toHexString(): String
5759
object com.datadog.android.internal.utils.ImageViewUtils
5860
fun resolveParentRectAbsPosition(android.view.View, Boolean = true): android.graphics.Rect

dd-sdk-android-internal/api/dd-sdk-android-internal.api

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,12 @@ public final class com/datadog/android/internal/telemetry/TracingHeaderTypesSet
168168
public fun toString ()Ljava/lang/String;
169169
}
170170

171+
public final class com/datadog/android/internal/thread/NamedRunnable : java/lang/Runnable {
172+
public fun <init> (Ljava/lang/String;Ljava/lang/Runnable;)V
173+
public final fun getSanitizedName ()Ljava/lang/String;
174+
public fun run ()V
175+
}
176+
171177
public final class com/datadog/android/internal/utils/ByteArrayExtKt {
172178
public static final fun toHexString ([B)Ljava/lang/String;
173179
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
3+
* This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
* Copyright 2016-Present Datadog, Inc.
5+
*/
6+
7+
package com.datadog.android.internal.thread
8+
9+
import java.util.Locale
10+
11+
/**
12+
* A wrapper around a [Runnable] that assigns it a sanitized, lowercase name.
13+
*
14+
* This class is useful when you want to associate a human-readable name with a [Runnable],
15+
* for logging, debugging, or tracking purposes.
16+
*
17+
* The provided [name] is sanitized by replacing spaces, colons, periods, and commas
18+
* with underscores (`_`), and converting all characters to lowercase.
19+
*
20+
* @param name The name to associate with this runnable, will be sanitized for safe usage.
21+
* @param runnable The actual runnable to be executed when [run] is called.
22+
*/
23+
class NamedRunnable(name: String, private val runnable: Runnable) : Runnable by runnable {
24+
25+
/**
26+
* Sanitized name after replacing spaces, colons, periods, and commas
27+
* with underscores (`_`), and converting all characters to lowercase.
28+
*/
29+
val sanitizedName: String = name.replace(SanitizedRegex, "_").lowercase(Locale.US)
30+
}
31+
32+
private val SanitizedRegex = "[ :.,]".toRegex()
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
3+
* This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
* Copyright 2016-Present Datadog, Inc.
5+
*/
6+
7+
package com.datadog.internal.thread
8+
9+
import com.datadog.android.internal.thread.NamedRunnable
10+
import com.datadog.tools.unit.forge.BaseConfigurator
11+
import fr.xgouchet.elmyr.Forge
12+
import fr.xgouchet.elmyr.junit5.ForgeConfiguration
13+
import fr.xgouchet.elmyr.junit5.ForgeExtension
14+
import org.assertj.core.api.Assertions.assertThat
15+
import org.junit.jupiter.api.Test
16+
import org.junit.jupiter.api.extension.ExtendWith
17+
import org.junit.jupiter.api.extension.Extensions
18+
import org.mockito.Mockito
19+
import org.mockito.Mockito.mock
20+
import org.mockito.junit.jupiter.MockitoSettings
21+
import org.mockito.kotlin.verify
22+
import org.mockito.kotlin.verifyNoMoreInteractions
23+
import org.mockito.quality.Strictness
24+
25+
@Extensions(
26+
ExtendWith(ForgeExtension::class)
27+
)
28+
@ForgeConfiguration(value = BaseConfigurator::class)
29+
@MockitoSettings(strictness = Strictness.LENIENT)
30+
class NamedRunnableTest {
31+
32+
@Test
33+
fun `M execute the run() from original runnable W initialize with a runnable`(forge: Forge) {
34+
// Given
35+
val mockRunnable = mock<Runnable>()
36+
val fakeName = forge.aString()
37+
38+
// When
39+
val testedNamedRunnable = NamedRunnable(fakeName, mockRunnable)
40+
testedNamedRunnable.run()
41+
42+
// Then
43+
verify(mockRunnable).run()
44+
verifyNoMoreInteractions(mockRunnable)
45+
}
46+
47+
@Test
48+
fun `M return the sanitized name W given not sanitized name`(forge: Forge) {
49+
// Given
50+
val fakeSectionNumber = forge.anInt(1, 10)
51+
val fakeStringList = mutableListOf<String>()
52+
val originalStringBuilder = StringBuilder()
53+
repeat(fakeSectionNumber) {
54+
val fakeName = forge.anAlphabeticalString()
55+
originalStringBuilder.append(fakeName)
56+
val symbol = forge.anElementFrom(
57+
" ",
58+
",",
59+
".",
60+
":"
61+
)
62+
originalStringBuilder.append(symbol)
63+
fakeStringList.add(fakeName)
64+
}
65+
val expectedStringBuilder = StringBuilder()
66+
repeat(fakeSectionNumber) { index ->
67+
expectedStringBuilder.append(fakeStringList[index])
68+
expectedStringBuilder.append("_")
69+
}
70+
val mockRunnable = Mockito.mock<Runnable>()
71+
72+
// When
73+
val testedRunnable = NamedRunnable(originalStringBuilder.toString(), mockRunnable)
74+
75+
// Then
76+
assertThat(testedRunnable.sanitizedName).isEqualTo(expectedStringBuilder.toString())
77+
}
78+
}

detekt_custom.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,7 @@ datadog:
736736
- "java.util.concurrent.atomic.AtomicInteger.getAndIncrement()"
737737
- "java.util.concurrent.atomic.AtomicInteger.incrementAndGet()"
738738
- "java.util.concurrent.atomic.AtomicInteger.set(kotlin.Int)"
739+
- "java.util.concurrent.atomic.AtomicLong.compareAndSet(kotlin.Long, kotlin.Long)"
739740
- "java.util.concurrent.atomic.AtomicLong.constructor(kotlin.Long)"
740741
- "java.util.concurrent.atomic.AtomicLong.get()"
741742
- "java.util.concurrent.atomic.AtomicLong.set(kotlin.Long)"
@@ -756,6 +757,7 @@ datadog:
756757
- "java.util.concurrent.atomic.AtomicReference.set(io.opentracing.Tracer?)"
757758
- "java.util.concurrent.atomic.AtomicReference.set(kotlin.Nothing?)"
758759
- "java.util.concurrent.atomic.AtomicReference.set(kotlin.String?)"
760+
- "java.util.concurrent.LinkedBlockingQueue.toArray()"
759761
# endregion
760762
# region Java I/O
761763
- "java.io.BufferedReader.readText()"
@@ -1063,6 +1065,7 @@ datadog:
10631065
- "kotlin.collections.MutableMap.put(kotlin.String, kotlin.Any)"
10641066
- "kotlin.collections.MutableMap.put(kotlin.String, kotlin.Any?)"
10651067
- "kotlin.collections.MutableMap.put(kotlin.String, kotlin.String)"
1068+
- "kotlin.collections.MutableMap.put(kotlin.String, kotlin.Int)"
10661069
- "kotlin.collections.MutableMap.put(kotlin.String, kotlin.Long?)"
10671070
- "kotlin.collections.MutableMap.put(kotlin.String, kotlin.Long)"
10681071
- "kotlin.collections.MutableMap.put(kotlin.String, kotlin.collections.Map)"
@@ -1229,8 +1232,10 @@ datadog:
12291232
- "kotlin.Number.toLong()"
12301233
- "kotlin.Short.toUShort()"
12311234
- "kotlin.ULong.toLong()"
1235+
- "kotlin.String.lowercase()"
12321236
- "kotlin.String.takeIf(kotlin.Function1)"
12331237
- "kotlin.String.toBigIntegerOrNull()"
1238+
- "kotlin.String.toRegex()"
12341239
- "kotlin.String.trim(kotlin.Function1)"
12351240
- "kotlin.ULong.toDouble()"
12361241
- "kotlin.UShort.toShort()"

0 commit comments

Comments
 (0)