Skip to content

Commit 84d0d60

Browse files
author
Rodrigo Gomez Palacio
authored
Merge pull request #2168 from OneSignal/read-your-write
Read-Your-Write Consistency
2 parents 7e68c1d + 5873b16 commit 84d0d60

File tree

25 files changed

+2130
-1432
lines changed

25 files changed

+2130
-1432
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.onesignal.common.consistency
2+
3+
import com.onesignal.common.consistency.enums.IamFetchRywTokenKey
4+
import com.onesignal.common.consistency.models.ICondition
5+
import com.onesignal.common.consistency.models.IConsistencyKeyEnum
6+
7+
/**
8+
* Used for read your write consistency when fetching In-App Messages.
9+
*
10+
* Params:
11+
* key : String - the index of the RYW token map
12+
*/
13+
class IamFetchReadyCondition(
14+
private val key: String,
15+
) : ICondition {
16+
companion object {
17+
const val ID = "IamFetchReadyCondition"
18+
}
19+
20+
override val id: String
21+
get() = ID
22+
23+
override fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String>>): Boolean {
24+
val tokenMap = indexedTokens[key] ?: return false
25+
val userUpdateTokenSet = tokenMap[IamFetchRywTokenKey.USER] != null
26+
27+
/**
28+
* We always update the session count so we know we will have a userUpdateToken. We don't
29+
* necessarily make a subscriptionUpdate call on every session. The following logic
30+
* doesn't consider tokenMap[IamFetchRywTokenKey.SUBSCRIPTION] for this reason. This doesn't
31+
* mean it isn't considered if present when doing the token comparison.
32+
*/
33+
return userUpdateTokenSet
34+
}
35+
36+
override fun getNewestToken(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String?>>): String? {
37+
val tokenMap = indexedTokens[key] ?: return null
38+
// maxOrNull compares lexicographically
39+
return listOfNotNull(tokenMap[IamFetchRywTokenKey.USER], tokenMap[IamFetchRywTokenKey.SUBSCRIPTION]).maxOrNull()
40+
}
41+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.onesignal.common.consistency.enums
2+
3+
import com.onesignal.common.consistency.models.IConsistencyKeyEnum
4+
5+
/**
6+
* Each enum is a key that we use to keep track of read-your-write tokens.
7+
* Although the enums are named with "UPDATE", they serve as keys for tokens from both PATCH & POST
8+
*/
9+
enum class IamFetchRywTokenKey : IConsistencyKeyEnum {
10+
USER,
11+
SUBSCRIPTION,
12+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.onesignal.common.consistency.impl
2+
3+
import com.onesignal.common.consistency.models.ICondition
4+
import com.onesignal.common.consistency.models.IConsistencyKeyEnum
5+
import com.onesignal.common.consistency.models.IConsistencyManager
6+
import kotlinx.coroutines.CompletableDeferred
7+
import kotlinx.coroutines.sync.Mutex
8+
import kotlinx.coroutines.sync.withLock
9+
10+
/**
11+
* Manages read-your-write tokens for more accurate segment membership
12+
* calculation. Uses customizable conditions that block retrieval of the newest token until met.
13+
*
14+
* Usage:
15+
* val consistencyManager = ConsistencyManager<MyEnum>()
16+
* val updateConditionDeferred = consistencyManager.registerCondition(MyCustomCondition())
17+
* val rywToken = updateConditionDeferred.await()
18+
*/
19+
class ConsistencyManager : IConsistencyManager {
20+
private val mutex = Mutex()
21+
private val indexedTokens: MutableMap<String, MutableMap<IConsistencyKeyEnum, String>> = mutableMapOf()
22+
private val conditions: MutableList<Pair<ICondition, CompletableDeferred<String?>>> =
23+
mutableListOf()
24+
25+
/**
26+
* Set method to update the token based on the key.
27+
* Params:
28+
* id: String - the index of the token map (e.g. onesignalId)
29+
* key: K - corresponds to the operation for which we have a read-your-write token
30+
* value: String? - the token (read-your-write token)
31+
*/
32+
override suspend fun setRywToken(
33+
id: String,
34+
key: IConsistencyKeyEnum,
35+
value: String,
36+
) {
37+
mutex.withLock {
38+
val rywTokens = indexedTokens.getOrPut(id) { mutableMapOf() }
39+
rywTokens[key] = value
40+
checkConditionsAndComplete()
41+
}
42+
}
43+
44+
/**
45+
* Register a condition with its corresponding deferred action. Returns a deferred condition.
46+
*/
47+
override suspend fun registerCondition(condition: ICondition): CompletableDeferred<String?> {
48+
mutex.withLock {
49+
val deferred = CompletableDeferred<String?>()
50+
val pair = Pair(condition, deferred)
51+
conditions.add(pair)
52+
checkConditionsAndComplete()
53+
return deferred
54+
}
55+
}
56+
57+
override suspend fun resolveConditionsWithID(id: String) {
58+
val completedConditions = mutableListOf<Pair<ICondition, CompletableDeferred<String?>>>()
59+
60+
for ((condition, deferred) in conditions) {
61+
if (condition.id == id) {
62+
if (!deferred.isCompleted) {
63+
deferred.complete(null)
64+
}
65+
}
66+
completedConditions.add(Pair(condition, deferred))
67+
}
68+
69+
// Remove completed conditions from the list
70+
conditions.removeAll(completedConditions)
71+
}
72+
73+
/**
74+
* IMPORTANT: calling code should be protected by mutex to avoid potential inconsistencies
75+
*/
76+
private fun checkConditionsAndComplete() {
77+
val completedConditions = mutableListOf<Pair<ICondition, CompletableDeferred<String?>>>()
78+
79+
for ((condition, deferred) in conditions) {
80+
if (condition.isMet(indexedTokens)) {
81+
val newestToken = condition.getNewestToken(indexedTokens)
82+
if (!deferred.isCompleted) {
83+
deferred.complete(newestToken)
84+
}
85+
completedConditions.add(Pair(condition, deferred))
86+
}
87+
}
88+
89+
// Remove completed conditions from the list
90+
conditions.removeAll(completedConditions)
91+
}
92+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.onesignal.common.consistency.models
2+
3+
interface ICondition {
4+
/**
5+
* Every implementation should define a unique ID & make available via a companion object for
6+
* ease of use
7+
*/
8+
val id: String
9+
10+
/**
11+
* Define a condition that "unblocks" execution
12+
* e.g. we have token (A && B) || A
13+
*/
14+
fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String>>): Boolean
15+
16+
/**
17+
* Used to process tokens according to their format & return the newest token.
18+
* e.g. numeric strings would be compared differently from JWT tokens
19+
*/
20+
fun getNewestToken(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String?>>): String?
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.onesignal.common.consistency.models
2+
3+
interface IConsistencyKeyEnum
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.onesignal.common.consistency.models
2+
3+
import kotlinx.coroutines.CompletableDeferred
4+
5+
interface IConsistencyManager {
6+
/**
7+
* Set method to update the RYW token based on the key.
8+
* Params:
9+
* id: String - the index of the RYW token map (e.g., onesignalId)
10+
* key: IConsistencyKeyEnum - corresponds to the operation for which we have a read-your-write token
11+
* value: String? - the read-your-write token
12+
*/
13+
suspend fun setRywToken(
14+
id: String,
15+
key: IConsistencyKeyEnum,
16+
value: String,
17+
)
18+
19+
/**
20+
* Register a condition with its corresponding deferred action. Returns a deferred condition.
21+
* Params:
22+
* condition: ICondition - the condition to be registered
23+
* Returns: CompletableDeferred<String?> - a deferred action that completes when the condition is met
24+
*/
25+
suspend fun registerCondition(condition: ICondition): CompletableDeferred<String?>
26+
27+
/**
28+
* Resolve all conditions with a specific ID
29+
*/
30+
suspend fun resolveConditionsWithID(id: String)
31+
}

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/http/impl/HttpClient.kt

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,18 @@ internal class HttpClient(
184184
}
185185
}
186186

187+
if (headers?.rywToken != null) {
188+
con.setRequestProperty("OneSignal-RYW-Token", headers.rywToken.toString())
189+
}
190+
191+
if (headers?.retryCount != null) {
192+
con.setRequestProperty("Onesignal-Retry-Count", headers.retryCount.toString())
193+
}
194+
195+
if (headers?.sessionDuration != null) {
196+
con.setRequestProperty("OneSignal-Session-Duration", headers.sessionDuration.toString())
197+
}
198+
187199
// Network request is made from getResponseCode()
188200
httpResponse = con.responseCode
189201

@@ -299,9 +311,9 @@ internal class HttpClient(
299311
* Reads the HTTP Retry-Limit from the response.
300312
*/
301313
private fun retryLimitFromResponse(con: HttpURLConnection): Int? {
302-
val retryLimitStr = con.getHeaderField("Retry-Limit")
314+
val retryLimitStr = con.getHeaderField("OneSignal-Retry-Limit")
303315
return if (retryLimitStr != null) {
304-
Logging.debug("HttpClient: Response Retry-After: $retryLimitStr")
316+
Logging.debug("HttpClient: Response OneSignal-Retry-Limit: $retryLimitStr")
305317
retryLimitStr.toIntOrNull()
306318
} else {
307319
null
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,20 @@
11
package com.onesignal.core.internal.http.impl
22

33
data class OptionalHeaders(
4+
/**
5+
* Used as an E-Tag
6+
*/
47
val cacheKey: String? = null,
8+
/**
9+
* Used for read your write consistency
10+
*/
11+
val rywToken: String? = null,
12+
/**
13+
* Current retry count
14+
*/
15+
val retryCount: Int? = null,
16+
/**
17+
* Used to track delay between session start and request
18+
*/
19+
val sessionDuration: Long? = null,
520
)

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/session/internal/session/impl/SessionListener.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ internal class SessionListener(
4040
}
4141

4242
override fun onSessionStarted() {
43-
_operationRepo.enqueue(TrackSessionStartOperation(_configModelStore.model.appId, _identityModelStore.model.onesignalId))
43+
_operationRepo.enqueue(TrackSessionStartOperation(_configModelStore.model.appId, _identityModelStore.model.onesignalId), true)
4444
}
4545

4646
override fun onSessionActive() {

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/UserModule.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.onesignal.user
22

3+
import com.onesignal.common.consistency.impl.ConsistencyManager
4+
import com.onesignal.common.consistency.models.IConsistencyManager
35
import com.onesignal.common.modules.IModule
46
import com.onesignal.common.services.ServiceBuilder
57
import com.onesignal.core.internal.operations.IOperationExecutor
@@ -34,6 +36,9 @@ import com.onesignal.user.internal.subscriptions.impl.SubscriptionManager
3436

3537
internal class UserModule : IModule {
3638
override fun register(builder: ServiceBuilder) {
39+
// Consistency
40+
builder.register<ConsistencyManager>().provides<IConsistencyManager>()
41+
3742
// Properties
3843
builder.register<PropertiesModelStore>().provides<PropertiesModelStore>()
3944
builder.register<PropertiesModelStoreListener>().provides<IBootstrapService>()

0 commit comments

Comments
 (0)