Skip to content

Commit 7204c06

Browse files
authored
feat(event processor): change batching to be compliant with Event Processor (#262)
* fix batch as much as possible when non-batch found in queue * flush on batchSize hit * fix event flushes for non-timer * add support for maxQueueSize - configurable * clean up batch on error
1 parent 578f987 commit 7204c06

File tree

13 files changed

+1021
-342
lines changed

13 files changed

+1021
-342
lines changed

OptimizelySwiftSDK.xcodeproj/project.pbxproj

Lines changed: 113 additions & 9 deletions
Large diffs are not rendered by default.

Sources/Customization/DefaultEventDispatcher.swift

Lines changed: 91 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@ open class DefaultEventDispatcher: BackgroundingCallbacks, OPTEventDispatcher {
2828
static let MAX_FAILURE_COUNT = 3
2929

3030
// default timerInterval
31-
var timerInterval: TimeInterval // every minute
31+
var timerInterval: TimeInterval
3232
// default batchSize.
3333
// attempt to send events in batches with batchSize number of events combined
3434
var batchSize: Int
35-
// start trimming the front of the queue when we get to over maxQueueSize
36-
// TODO: implement
37-
var maxQueueSize: Int = 30000
35+
var maxQueueSize: Int
3836

3937
lazy var logger = OPTLoggerFactory.getLogger()
4038
var backingStore: DataStoreType
@@ -47,35 +45,79 @@ open class DefaultEventDispatcher: BackgroundingCallbacks, OPTEventDispatcher {
4745
// timer as a atomic property.
4846
var timer: AtomicProperty<Timer> = AtomicProperty<Timer>()
4947

50-
public init(batchSize: Int = 10, backingStore: DataStoreType = .file, dataStoreName: String = "OPTEventQueue", timerInterval: TimeInterval = 60*1 ) {
51-
self.batchSize = batchSize > 0 ? batchSize : 1
48+
public struct DefaultValues {
49+
static public let batchSize = 10
50+
static public let timeInterval: TimeInterval = 60 // secs
51+
static public let maxQueueSize = 10000
52+
}
53+
54+
public init(batchSize: Int = DefaultValues.batchSize,
55+
backingStore: DataStoreType = .file,
56+
dataStoreName: String = "OPTEventQueue",
57+
timerInterval: TimeInterval = DefaultValues.timeInterval,
58+
maxQueueSize: Int = DefaultValues.maxQueueSize) {
59+
self.batchSize = batchSize > 0 ? batchSize : DefaultValues.batchSize
5260
self.backingStore = backingStore
5361
self.backingStoreName = dataStoreName
5462
self.timerInterval = timerInterval
63+
self.maxQueueSize = maxQueueSize > 100 ? maxQueueSize : DefaultValues.maxQueueSize
5564

5665
switch backingStore {
5766
case .file:
58-
self.dataStore = DataStoreQueueStackImpl<EventForDispatch>(queueStackName: "OPTEventQueue", dataStore: DataStoreFile<[Data]>(storeName: backingStoreName))
67+
self.dataStore = DataStoreQueueStackImpl<EventForDispatch>(queueStackName: "OPTEventQueue",
68+
dataStore: DataStoreFile<[Data]>(storeName: backingStoreName))
5969
case .memory:
60-
self.dataStore = DataStoreQueueStackImpl<EventForDispatch>(queueStackName: "OPTEventQueue", dataStore: DataStoreMemory<[Data]>(storeName: backingStoreName))
70+
self.dataStore = DataStoreQueueStackImpl<EventForDispatch>(queueStackName: "OPTEventQueue",
71+
dataStore: DataStoreMemory<[Data]>(storeName: backingStoreName))
6172
case .userDefaults:
62-
self.dataStore = DataStoreQueueStackImpl<EventForDispatch>(queueStackName: "OPTEventQueue", dataStore: DataStoreUserDefaults())
73+
self.dataStore = DataStoreQueueStackImpl<EventForDispatch>(queueStackName: "OPTEventQueue",
74+
dataStore: DataStoreUserDefaults())
6375
}
6476

77+
78+
if self.maxQueueSize < self.batchSize {
79+
self.logger.e(.eventDispatcherConfigError("batchSize cannot be bigger than maxQueueSize"))
80+
self.maxQueueSize = self.batchSize
81+
}
82+
83+
addProjectChangeNotificationObservers()
84+
6585
subscribe()
6686
}
6787

6888
deinit {
69-
timer.performAtomic { (timer) in
70-
timer.invalidate()
71-
}
89+
stopTimer()
90+
7291
unsubscribe()
7392
}
7493

94+
func addProjectChangeNotificationObservers() {
95+
NotificationCenter.default.addObserver(forName: .didReceiveOptimizelyProjectIdChange, object: nil, queue: nil) { (notif) in
96+
self.logger.d("Event flush triggered by datafile projectId change")
97+
self.flushEvents()
98+
}
99+
100+
NotificationCenter.default.addObserver(forName: .didReceiveOptimizelyRevisionChange, object: nil, queue: nil) { (notif) in
101+
self.logger.d("Event flush triggered by datafile revision change")
102+
self.flushEvents()
103+
}
104+
}
105+
75106
open func dispatchEvent(event: EventForDispatch, completionHandler: DispatchCompletionHandler?) {
107+
guard dataStore.count < maxQueueSize else {
108+
let error = OptimizelyError.eventDispatchFailed("EventQueue is full")
109+
self.logger.e(error)
110+
completionHandler?(.failure(error))
111+
return
112+
}
113+
76114
dataStore.save(item: event)
77115

78-
setTimer()
116+
if dataStore.count >= batchSize {
117+
flushEvents()
118+
} else {
119+
startTimer()
120+
}
79121

80122
completionHandler?(.success(event.body))
81123
}
@@ -88,92 +130,50 @@ open class DefaultEventDispatcher: BackgroundingCallbacks, OPTEventDispatcher {
88130
dispatcher.async {
89131
// we don't remove anthing off of the queue unless it is successfully sent.
90132
var failureCount = 0
91-
// if we can't batch the events because they are not from the same project or
92-
// are being sent to a different url. we set the batchSizeHolder to batchSize
93-
// and batchSize to 1 until we have sent the last batch that couldn't be batched.
94-
var batchSizeHolder = 0
95-
// the batch send count if the events failed to be batched.
96-
var sendCount = 0
97-
98-
let failedBatch = { () -> Void in
99-
// hold the batch size
100-
batchSizeHolder = self.batchSize
101-
// set it to 1 until the last batch that couldn't be batched is sent
102-
self.batchSize = 1
103-
}
104133

105-
let resetBatch = { () -> Void in
106-
if batchSizeHolder != 0 {
107-
self.batchSize = batchSizeHolder
108-
sendCount = 0
109-
batchSizeHolder = 0
134+
func removeStoredEvents(num: Int) {
135+
if let removedItem = self.dataStore.removeFirstItems(count: num), removedItem.count > 0 {
136+
// avoid event-log-message preparation overheads with closure-logging
137+
self.logger.d({ "Removed stored \(num) events starting with \(removedItem.first!)" })
138+
} else {
139+
self.logger.e("Failed to removed \(num) events")
110140
}
111-
112141
}
142+
113143
while let eventsToSend: [EventForDispatch] = self.dataStore.getFirstItems(count: self.batchSize) {
114-
let actualEventsSize = eventsToSend.count
115-
var eventToSend = eventsToSend.batch()
116-
if eventToSend != nil {
117-
// we merged the event and ready for batch
118-
// if the bacth size is not equal to the actual event size,
119-
// then setup the batchSizeHolder to be the size of the event.
120-
if actualEventsSize != self.batchSize {
121-
batchSizeHolder = self.batchSize
122-
self.batchSize = actualEventsSize
123-
sendCount = actualEventsSize - 1
124-
}
125-
} else {
126-
failedBatch()
127-
// just send the first one and let the rest be sent until sendCount == batchSizeHolder
128-
eventToSend = eventsToSend.first
129-
}
144+
let (numEvents, batched) = eventsToSend.batch()
130145

131-
guard let event = eventToSend else {
132-
self.logger.e(.eventBatchFailed)
133-
resetBatch()
134-
break
135-
}
146+
guard numEvents > 0 else { break }
147+
148+
guard let batchEvent = batched else {
149+
// discard an invalid event that causes batching failure
150+
// - if an invalid event is found while batching, it batches all the valid ones before the invalid one and sends it out.
151+
// - when trying to batch next, it finds the invalid one at the header. It discards that specific invalid one and continue batching next ones.
136152

153+
removeStoredEvents(num: 1)
154+
continue
155+
}
156+
137157
// we've exhuasted our failure count. Give up and try the next time a event
138158
// is queued or someone calls flush.
139159
if failureCount > DefaultEventDispatcher.MAX_FAILURE_COUNT {
140160
self.logger.e(.eventSendRetyFailed(failureCount))
141-
failureCount = 0
142-
resetBatch()
143161
break
144162
}
145-
163+
146164
// make the send event synchronous. enter our notify
147165
self.notify.enter()
148-
self.sendEvent(event: event) { (result) -> Void in
166+
self.sendEvent(event: batchEvent) { (result) -> Void in
149167
switch result {
150168
case .failure(let error):
151169
self.logger.e(error.reason)
152170
failureCount += 1
153171
case .success:
154172
// we succeeded. remove the batch size sent.
155-
if let removedItem: [EventForDispatch] = self.dataStore.removeFirstItems(count: self.batchSize) {
156-
if self.batchSize == 1 && removedItem.first != event {
157-
self.logger.e("Removed event different from sent event")
158-
} else {
159-
// avoid event-log-message preparation overheads with closure-logging
160-
self.logger.d({ "Successfully sent event: \(event)" })
161-
}
162-
} else {
163-
self.logger.e("Removed event nil for sent item")
164-
}
173+
removeStoredEvents(num: numEvents)
174+
165175
// reset failureCount
166176
failureCount = 0
167-
// did we have to send a batch one at a time?
168-
if batchSizeHolder != 0 {
169-
sendCount += 1
170-
// have we sent all the events in this batch?
171-
if sendCount == self.batchSize {
172-
resetBatch()
173-
}
174-
} else {
175-
// batch had batchSize items
176-
}
177177
}
178178
// our send is done.
179179
self.notify.leave()
@@ -183,7 +183,6 @@ open class DefaultEventDispatcher: BackgroundingCallbacks, OPTEventDispatcher {
183183
self.notify.wait()
184184
}
185185
}
186-
187186
}
188187

189188
open func sendEvent(event: EventForDispatch, completionHandler: @escaping DispatchCompletionHandler) {
@@ -210,21 +209,18 @@ open class DefaultEventDispatcher: BackgroundingCallbacks, OPTEventDispatcher {
210209
}
211210

212211
func applicationDidEnterBackground() {
213-
timer.performAtomic { (timer) in
214-
timer.invalidate()
215-
}
216-
timer.property = nil
212+
stopTimer()
217213

218214
flushEvents()
219215
}
220216

221217
func applicationDidBecomeActive() {
222218
if dataStore.count > 0 {
223-
setTimer()
219+
startTimer()
224220
}
225221
}
226222

227-
func setTimer() {
223+
func startTimer() {
228224
// timer is activated only for iOS10+ and non-zero interval value
229225
guard #available(iOS 10.0, tvOS 10.0, *), timerInterval > 0 else {
230226
flushEvents()
@@ -237,18 +233,22 @@ open class DefaultEventDispatcher: BackgroundingCallbacks, OPTEventDispatcher {
237233
// should check here again
238234
guard self.timer.property == nil else { return }
239235

240-
self.timer.property = Timer.scheduledTimer(withTimeInterval: self.timerInterval, repeats: true) { (timer) in
236+
self.timer.property = Timer.scheduledTimer(withTimeInterval: self.timerInterval, repeats: true) { _ in
241237
self.dispatcher.async {
242-
if self.dataStore.count == 0 {
243-
self.timer.performAtomic {(timer) in
244-
timer.invalidate()
245-
}
246-
self.timer.property = nil
247-
} else {
238+
if self.dataStore.count > 0 {
248239
self.flushEvents()
240+
} else {
241+
self.stopTimer()
249242
}
250243
}
251244
}
252245
}
253246
}
247+
248+
func stopTimer() {
249+
timer.performAtomic { (timer) in
250+
timer.invalidate()
251+
}
252+
timer.property = nil
253+
}
254254
}

Sources/Data Model/ProjectConfig.swift

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,13 @@ class ProjectConfig {
8282
return project.experiments + project.groups.map({$0.experiments}).flatMap({$0})
8383
}()
8484

85+
// MARK: - Init
86+
8587
init(datafile: Data) throws {
8688
do {
8789
self.project = try JSONDecoder().decode(Project.self, from: datafile)
90+
91+
ProjectConfig.observer.update(project: project)
8892
} catch {
8993
throw OptimizelyError.dataFileInvalid
9094
}
@@ -96,13 +100,51 @@ class ProjectConfig {
96100

97101
convenience init(datafile: String) throws {
98102
try self.init(datafile: Data(datafile.utf8))
99-
}
103+
}
100104

101-
init() {
105+
init() {}
106+
}
107+
108+
// MARK: - Project Change Observer
109+
110+
extension ProjectConfig {
111+
112+
struct ProjectObserver {
113+
var projectId: String? {
114+
didSet {
115+
if oldValue != nil, projectId != oldValue {
116+
NotificationCenter.default.post(name: .didReceiveOptimizelyProjectIdChange, object: nil)
117+
}
118+
}
119+
}
120+
121+
var revision: String? {
122+
didSet {
123+
if oldValue != nil, revision != oldValue {
124+
NotificationCenter.default.post(name: .didReceiveOptimizelyRevisionChange, object: nil)
125+
}
126+
}
127+
}
128+
129+
/// update obseverable properties
130+
///
131+
/// - Parameter project: new Project values (pass nil for reset)
132+
mutating func update(project: Project?) {
133+
self.projectId = project?.projectId
134+
self.revision = project?.revision
135+
}
136+
137+
mutating func reset() {
138+
self.update(project: nil)
139+
}
102140
}
103141

142+
static var observer = ProjectObserver()
143+
104144
}
105145

146+
// MARK: - Persistent Data
147+
106148
extension ProjectConfig {
107149
private func whitelistUser(userId: String, experimentId: String, variationId: String) {
108150
var dic = whitelistUsers[userId] ?? [String: String]()
@@ -127,7 +169,6 @@ extension ProjectConfig {
127169
// old versions (< 4) of datafiles not supported
128170
return ["4"].contains(version)
129171
}
130-
131172
}
132173

133174
// MARK: - Project Access

0 commit comments

Comments
 (0)