Skip to content

Commit 6e05278

Browse files
author
Rodrigo Gomez Palacio
committed
ConsistencyManager interface & implementation
Manages read-your-write tokens. The manager works based on conditions & tokens. Tokens are stored in a nested map indexed by a unique id (e.g. `onesignalId`) and a token key (e.g. `USER`). This allows us to track tokens on a per-user basis (e.g. handle switching users). Conditions work by creating a blocking mechanism with customizable token retrieval until a pre-defined condition is met (e.g. at least two specific tokens are available). Also allows extensibility for future applications to control offset blocking mechanism in consistency use-cases.
1 parent b03d5f5 commit 6e05278

File tree

5 files changed

+258
-0
lines changed

5 files changed

+258
-0
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.onesignal.common.consistency.impl
2+
3+
import com.onesignal.common.consistency.models.ICondition
4+
import com.onesignal.common.consistency.models.IConsistencyKeyEnum
5+
import com.onesignal.common.consistency.models.IConsistencyManager
6+
import kotlinx.coroutines.CompletableDeferred
7+
import kotlinx.coroutines.sync.Mutex
8+
import kotlinx.coroutines.sync.withLock
9+
10+
/**
11+
* Manages read-your-write tokens for more accurate segment membership
12+
* calculation. Uses customizable conditions that block retrieval of the newest token until met.
13+
*
14+
* Usage:
15+
* val consistencyManager = ConsistencyManager<MyEnum>()
16+
* val updateConditionDeferred = consistencyManager.registerCondition(MyCustomCondition())
17+
* val rywToken = updateConditionDeferred.await()
18+
*/
19+
class ConsistencyManager : IConsistencyManager {
20+
private val mutex = Mutex()
21+
private val indexedTokens: MutableMap<String, MutableMap<IConsistencyKeyEnum, String>> = mutableMapOf()
22+
private val conditions: MutableList<Pair<ICondition, CompletableDeferred<String?>>> =
23+
mutableListOf()
24+
25+
/**
26+
* Set method to update the token based on the key.
27+
* Params:
28+
* id: String - the index of the token map (e.g. onesignalId)
29+
* key: K - corresponds to the operation for which we have a read-your-write token
30+
* value: String? - the token (read-your-write token)
31+
*/
32+
override suspend fun setRywToken(
33+
id: String,
34+
key: IConsistencyKeyEnum,
35+
value: String,
36+
) {
37+
mutex.withLock {
38+
val rywTokens = indexedTokens.getOrPut(id) { mutableMapOf() }
39+
rywTokens[key] = value
40+
checkConditionsAndComplete()
41+
}
42+
}
43+
44+
/**
45+
* Register a condition with its corresponding deferred action. Returns a deferred condition.
46+
*/
47+
override suspend fun registerCondition(condition: ICondition): CompletableDeferred<String?> {
48+
mutex.withLock {
49+
val deferred = CompletableDeferred<String?>()
50+
val pair = Pair(condition, deferred)
51+
conditions.add(pair)
52+
checkConditionsAndComplete()
53+
return deferred
54+
}
55+
}
56+
57+
override suspend fun resolveConditionsWithID(id: String) {
58+
val completedConditions = mutableListOf<Pair<ICondition, CompletableDeferred<String?>>>()
59+
60+
for ((condition, deferred) in conditions) {
61+
if (condition.id == id) {
62+
if (!deferred.isCompleted) {
63+
deferred.complete(null)
64+
}
65+
}
66+
completedConditions.add(Pair(condition, deferred))
67+
}
68+
69+
// Remove completed conditions from the list
70+
conditions.removeAll(completedConditions)
71+
}
72+
73+
/**
74+
* IMPORTANT: calling code should be protected by mutex to avoid potential inconsistencies
75+
*/
76+
private fun checkConditionsAndComplete() {
77+
val completedConditions = mutableListOf<Pair<ICondition, CompletableDeferred<String?>>>()
78+
79+
for ((condition, deferred) in conditions) {
80+
if (condition.isMet(indexedTokens)) {
81+
val newestToken = condition.getNewestToken(indexedTokens)
82+
if (!deferred.isCompleted) {
83+
deferred.complete(newestToken)
84+
}
85+
completedConditions.add(Pair(condition, deferred))
86+
}
87+
}
88+
89+
// Remove completed conditions from the list
90+
conditions.removeAll(completedConditions)
91+
}
92+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.onesignal.common.consistency.models
2+
3+
interface ICondition {
4+
/**
5+
* Every implementation should define a unique ID & make available via a companion object for
6+
* ease of use
7+
*/
8+
val id: String
9+
10+
/**
11+
* Define a condition that "unblocks" execution
12+
* e.g. we have token (A && B) || A
13+
*/
14+
fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String>>): Boolean
15+
16+
/**
17+
* Used to process tokens according to their format & return the newest token.
18+
* e.g. numeric strings would be compared differently from JWT tokens
19+
*/
20+
fun getNewestToken(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String?>>): String?
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.onesignal.common.consistency.models
2+
3+
interface IConsistencyKeyEnum
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.onesignal.common.consistency.models
2+
3+
import kotlinx.coroutines.CompletableDeferred
4+
5+
interface IConsistencyManager {
6+
/**
7+
* Set method to update the RYW token based on the key.
8+
* Params:
9+
* id: String - the index of the RYW token map (e.g., onesignalId)
10+
* key: IConsistencyKeyEnum - corresponds to the operation for which we have a read-your-write token
11+
* value: String? - the read-your-write token
12+
*/
13+
suspend fun setRywToken(
14+
id: String,
15+
key: IConsistencyKeyEnum,
16+
value: String,
17+
)
18+
19+
/**
20+
* Register a condition with its corresponding deferred action. Returns a deferred condition.
21+
* Params:
22+
* condition: ICondition - the condition to be registered
23+
* Returns: CompletableDeferred<String?> - a deferred action that completes when the condition is met
24+
*/
25+
suspend fun registerCondition(condition: ICondition): CompletableDeferred<String?>
26+
27+
/**
28+
* Resolve all conditions with a specific ID
29+
*/
30+
suspend fun resolveConditionsWithID(id: String)
31+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package com.onesignal.common.consistency.impl
2+
3+
import com.onesignal.common.consistency.enums.IamFetchRywTokenKey
4+
import com.onesignal.common.consistency.models.ICondition
5+
import com.onesignal.common.consistency.models.IConsistencyKeyEnum
6+
import io.kotest.core.spec.style.FunSpec
7+
import io.kotest.matchers.shouldBe
8+
import kotlinx.coroutines.test.runTest
9+
10+
class ConsistencyManagerTests : FunSpec({
11+
12+
lateinit var consistencyManager: ConsistencyManager
13+
14+
beforeAny {
15+
consistencyManager = ConsistencyManager()
16+
}
17+
18+
test("setRywToken updates the token correctly") {
19+
runTest {
20+
// Given
21+
val id = "test_id"
22+
val key = IamFetchRywTokenKey.USER
23+
val value = "123"
24+
25+
consistencyManager.setRywToken(id, key, value)
26+
27+
val condition = TestMetCondition(mapOf(id to mapOf(key to value)))
28+
val deferred = consistencyManager.registerCondition(condition)
29+
val result = deferred.await()
30+
31+
result shouldBe value
32+
}
33+
}
34+
35+
test("registerCondition completes when condition is met") {
36+
runTest {
37+
// Given
38+
val id = "test_id"
39+
val key = IamFetchRywTokenKey.USER
40+
val value = "123"
41+
42+
// Set a token to meet the condition
43+
consistencyManager.setRywToken(id, key, value)
44+
45+
val condition = TestMetCondition(mapOf(id to mapOf(key to value)))
46+
val deferred = consistencyManager.registerCondition(condition)
47+
48+
deferred.await()
49+
deferred.isCompleted shouldBe true
50+
}
51+
}
52+
53+
test("registerCondition does not complete when condition is not met") {
54+
runTest {
55+
val condition = TestUnmetCondition()
56+
val deferred = consistencyManager.registerCondition(condition)
57+
58+
consistencyManager.setRywToken("id", IamFetchRywTokenKey.USER, "123")
59+
deferred.isCompleted shouldBe false
60+
}
61+
}
62+
63+
test("resolveConditionsWithID resolves conditions based on ID") {
64+
runTest {
65+
val condition = TestUnmetCondition()
66+
val deferred = consistencyManager.registerCondition(condition)
67+
consistencyManager.resolveConditionsWithID(TestUnmetCondition.ID)
68+
deferred.await()
69+
70+
deferred.isCompleted shouldBe true
71+
}
72+
}
73+
}) {
74+
// Mock implementation of ICondition that simulates a condition that isn't met
75+
private class TestUnmetCondition : ICondition {
76+
companion object {
77+
const val ID = "TestUnmetCondition"
78+
}
79+
80+
override val id: String
81+
get() = ID
82+
83+
override fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String>>): Boolean {
84+
return false // Always returns false to simulate an unmet condition
85+
}
86+
87+
override fun getNewestToken(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String?>>): String? {
88+
return null
89+
}
90+
}
91+
92+
// Mock implementation of ICondition for cases where the condition is met
93+
private class TestMetCondition(
94+
private val expectedRywTokens: Map<String, Map<IConsistencyKeyEnum, String?>>,
95+
) : ICondition {
96+
companion object {
97+
const val ID = "TestMetCondition"
98+
}
99+
100+
override val id: String
101+
get() = ID
102+
103+
override fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String>>): Boolean {
104+
return indexedTokens == expectedRywTokens
105+
}
106+
107+
override fun getNewestToken(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String?>>): String? {
108+
return expectedRywTokens.values.firstOrNull()?.values?.firstOrNull()
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)