Skip to content

Commit 80e8618

Browse files
(fix): refactor batch_processor to start on first process and use mutex to wait till deadline if nothing in the queue (#232)
* initial commit of refactor * fix project spec * 2.4.4 is too quick * trying to get 2.4.4 queue to fill * cleanup one test * update to look for debug statement * reduce size of max queue
1 parent 86a7aae commit 80e8618

File tree

5 files changed

+83
-66
lines changed

5 files changed

+83
-66
lines changed

.rubocop.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ Style/SignalException:
4646
Enabled: false
4747

4848
Lint/RescueException:
49-
Enabled: false
49+
Enabled: true
5050

5151
Layout/EndOfLine:
5252
EnforcedStyle: lf

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ rvm:
99
- 2.5.1
1010
- 2.6.0
1111
before_install:
12-
- gem update --system
12+
# - gem update --system
1313
- gem install bundler
1414
install:
1515
- bundle install

lib/optimizely/event/batch_event_processor.rb

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ class BatchEventProcessor < EventProcessor
3131
DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds
3232
DEFAULT_QUEUE_CAPACITY = 1000
3333
DEFAULT_TIMEOUT_INTERVAL = 5 # interval in seconds
34-
MAX_NIL_COUNT = 3
3534

3635
FLUSH_SIGNAL = 'FLUSH_SIGNAL'
3736
SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL'
@@ -62,7 +61,7 @@ def initialize(
6261
@notification_center = notification_center
6362
@current_batch = []
6463
@started = false
65-
start!
64+
@stopped = false
6665
end
6766

6867
def start!
@@ -72,26 +71,37 @@ def start!
7271
end
7372
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
7473
@logger.log(Logger::INFO, 'Starting scheduler.')
75-
@thread = Thread.new { run }
74+
if @wait_mutex.nil?
75+
@wait_mutex = Mutex.new
76+
@resource = ConditionVariable.new
77+
end
78+
@thread = Thread.new { run_queue }
7679
@started = true
80+
@stopped = false
7781
end
7882

7983
def flush
8084
@event_queue << FLUSH_SIGNAL
85+
@wait_mutex.synchronize { @resource.signal }
8186
end
8287

8388
def process(user_event)
8489
@logger.log(Logger::DEBUG, "Received userEvent: #{user_event}")
8590

86-
if !@started || !@thread.alive?
91+
# if the processor has been explicitly stopped. Don't accept tasks
92+
if @stopped
8793
@logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.')
8894
return
8995
end
9096

97+
# start if the processor hasn't been started
98+
start! unless @started
99+
91100
begin
92101
@event_queue.push(user_event, true)
93-
rescue Exception
94-
@logger.log(Logger::WARN, 'Payload not accepted by the queue.')
102+
@wait_mutex.synchronize { @resource.signal }
103+
rescue => e
104+
@logger.log(Logger::WARN, 'Payload not accepted by the queue: ' + e.message)
95105
return
96106
end
97107
end
@@ -101,42 +111,20 @@ def stop!
101111

102112
@logger.log(Logger::INFO, 'Stopping scheduler.')
103113
@event_queue << SHUTDOWN_SIGNAL
114+
@wait_mutex.synchronize { @resource.signal }
104115
@thread.join(DEFAULT_TIMEOUT_INTERVAL)
105116
@started = false
117+
@stopped = true
106118
end
107119

108120
private
109121

110-
def run
111-
# if we receive a number of item nils that reach MAX_NIL_COUNT,
112-
# then we hang on the pop via setting use_pop to false
113-
@nil_count = 0
114-
# hang on pop if true
115-
@use_pop = false
116-
loop do
117-
if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline
118-
@logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.')
119-
flush_queue!
120-
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
121-
@use_pop = true if @nil_count > MAX_NIL_COUNT
122-
end
123-
124-
item = @event_queue.pop if @event_queue.length.positive? || @use_pop
125-
126-
if item.nil?
127-
# when nil count is greater than MAX_NIL_COUNT, we hang on the pop until there is an item available.
128-
# this avoids to much spinning of the loop.
129-
@nil_count += 1
130-
next
131-
end
132-
133-
# reset nil_count and use_pop if we have received an item.
134-
@nil_count = 0
135-
@use_pop = false
136-
122+
def process_queue
123+
while @event_queue.length.positive?
124+
item = @event_queue.pop
137125
if item == SHUTDOWN_SIGNAL
138126
@logger.log(Logger::DEBUG, 'Received shutdown signal.')
139-
break
127+
return false
140128
end
141129

142130
if item == FLUSH_SIGNAL
@@ -147,15 +135,35 @@ def run
147135

148136
add_to_batch(item) if item.is_a? Optimizely::UserEvent
149137
end
138+
true
139+
end
140+
141+
def run_queue
142+
loop do
143+
if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline
144+
@logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.')
145+
146+
break unless process_queue
147+
148+
flush_queue!
149+
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
150+
end
151+
152+
break unless process_queue
153+
154+
# what is the current interval to flush in seconds
155+
interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) * 0.001
156+
157+
next unless interval.positive?
158+
159+
@wait_mutex.synchronize { @resource.wait(@wait_mutex, interval) }
160+
end
150161
rescue SignalException
151162
@logger.log(Logger::ERROR, 'Interrupted while processing buffer.')
152-
rescue Exception => e
163+
rescue => e
153164
@logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}")
154165
ensure
155-
@logger.log(
156-
Logger::INFO,
157-
'Exiting processing loop. Attempting to flush pending events.'
158-
)
166+
@logger.log(Logger::INFO, 'Exiting processing loop. Attempting to flush pending events.')
159167
flush_queue!
160168
end
161169

spec/event/batch_event_processor_spec.rb

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
let(:event) { project_config.get_event_from_key('test_event') }
3333

3434
before(:example) do
35+
spy_logger = spy('logger')
3536
@event_queue = SizedQueue.new(100)
3637
@event_dispatcher = Optimizely::EventDispatcher.new
3738
allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))
@@ -46,6 +47,8 @@
4647
it 'should log waring when service is already started' do
4748
@event_processor = Optimizely::BatchEventProcessor.new(logger: spy_logger)
4849
@event_processor.start!
50+
@event_processor.start!
51+
4952
expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Service already started.').once
5053
end
5154

@@ -61,22 +64,23 @@
6164
)
6265

6366
@event_processor.process(conversion_event)
64-
# flush interval is set to 100ms. Wait for 300ms and assert that event is dispatched.
65-
sleep 1
67+
# flush interval is set to 100ms. Wait for 200ms and assert that event is dispatched.
68+
sleep 0.2
6669

6770
expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once
6871
expect(@notification_center).to have_received(:send_notifications).with(
6972
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
7073
log_event
7174
).once
7275
expect(spy_logger).to have_received(:log).with(Logger::INFO, 'Flushing Queue.').once
76+
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.').at_most(2).times
7377
end
7478

7579
it 'should flush the current batch when max batch size met' do
7680
@event_processor = Optimizely::BatchEventProcessor.new(
7781
event_dispatcher: @event_dispatcher,
7882
batch_size: 11,
79-
flush_interval: 100_000,
83+
flush_interval: 10_000,
8084
logger: spy_logger
8185
)
8286

@@ -92,8 +96,9 @@
9296
end
9397

9498
# Wait until other thread has processed the event.
95-
until @event_processor.event_queue.empty?; end
96-
until @event_processor.current_batch.empty?; end
99+
sleep 0.1 until @event_processor.event_queue.empty?
100+
101+
sleep 0.1 until @event_processor.current_batch.empty?
97102

98103
expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).once
99104
expect(@event_dispatcher).to have_received(:dispatch_event).with(
@@ -109,7 +114,7 @@
109114
@event_processor = Optimizely::BatchEventProcessor.new(
110115
event_queue: @event_queue,
111116
event_dispatcher: @event_dispatcher,
112-
flush_interval: 100_000,
117+
flush_interval: 10_000,
113118
logger: spy_logger
114119
)
115120

@@ -120,8 +125,9 @@
120125
@event_processor.flush
121126

122127
# Wait until other thread has processed the event.
123-
until @event_processor.event_queue.empty?; end
124-
until @event_processor.current_batch.empty?; end
128+
sleep 0.1 until @event_processor.event_queue.empty?
129+
130+
sleep 0.1 until @event_processor.current_batch.empty?
125131

126132
expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).twice
127133
expect(@event_processor.event_queue.length).to eq(0)
@@ -143,13 +149,13 @@
143149
expect(user_event1.event_context[:revision]).to eq('1')
144150
@event_processor.process(user_event1)
145151
# Wait until other thread has processed the event.
146-
while @event_processor.current_batch.length != 1; end
152+
sleep 0.1 while @event_processor.current_batch.length != 1
147153

148154
expect(user_event2.event_context[:revision]).to eq('2')
149155
@event_processor.process(user_event2)
150156
@event_processor.process(user_event2)
151157
# Wait until other thread has processed the event.
152-
while @event_processor.current_batch.length != 2; end
158+
sleep 0.1 while @event_processor.current_batch.length != 2
153159

154160
expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once
155161
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.').once
@@ -170,13 +176,13 @@
170176
expect(user_event1.event_context[:project_id]).to eq('X')
171177
@event_processor.process(user_event1)
172178
# Wait until other thread has processed the event.
173-
while @event_processor.current_batch.length != 1; end
179+
sleep 0.1 while @event_processor.current_batch.length != 1
174180

175181
expect(user_event2.event_context[:project_id]).to eq('Y')
176182
@event_processor.process(user_event2)
177183
@event_processor.process(user_event2)
178184
# Wait until other thread has processed the event.
179-
while @event_processor.current_batch.length != 2; end
185+
sleep 0.1 while @event_processor.current_batch.length != 2
180186

181187
expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once
182188
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.').once
@@ -252,10 +258,11 @@
252258
@event_processor.process(conversion_event)
253259

254260
# Wait until other thread has processed the event.
255-
while @event_processor.current_batch.length != 1; end
261+
sleep 0.1 while @event_processor.current_batch.length != 1
262+
256263
@event_processor.flush
257264
# Wait until other thread has processed the event.
258-
until @event_processor.current_batch.empty?; end
265+
sleep 0.1 until @event_processor.current_batch.empty?
259266

260267
expect(@notification_center).to have_received(:send_notifications).with(
261268
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
@@ -281,10 +288,11 @@
281288

282289
@event_processor.process(conversion_event)
283290
# Wait until other thread has processed the event.
284-
while @event_processor.current_batch.length != 1; end
291+
sleep 0.1 while @event_processor.current_batch.length != 1
292+
285293
@event_processor.flush
286294
# Wait until other thread has processed the event.
287-
until @event_processor.current_batch.empty?; end
295+
sleep 0.1 until @event_processor.current_batch.empty?
288296

289297
expect(@notification_center).not_to have_received(:send_notifications)
290298
expect(spy_logger).to have_received(:log).once.with(
@@ -315,7 +323,7 @@
315323
end
316324

317325
# Wait until other thread has processed the event.
318-
while @event_processor.current_batch.length != 4; end
326+
sleep 0.1 while @event_processor.current_batch.length != 4
319327
expect(@event_dispatcher).not_to have_received(:dispatch_event)
320328

321329
@event_processor.stop!
@@ -329,32 +337,31 @@
329337

330338
it 'should log a warning when Queue gets full' do
331339
@event_processor = Optimizely::BatchEventProcessor.new(
332-
event_queue: SizedQueue.new(10),
340+
event_queue: SizedQueue.new(5),
333341
event_dispatcher: @event_dispatcher,
334-
batch_size: 100,
335-
flush_interval: 100_000,
342+
batch_size: 1000,
343+
flush_interval: 10_000,
336344
logger: spy_logger
337345
)
338346

339347
user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
340-
11.times do
348+
900.times do
341349
@event_processor.process(user_event)
342350
end
343351

344-
# Wait until other thread has processed the event.
345-
while @event_processor.current_batch.length != 10; end
346352
expect(@event_dispatcher).not_to have_received(:dispatch_event)
347-
expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Payload not accepted by the queue.').once
353+
expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Payload not accepted by the queue: queue full').at_least(:once)
348354
end
349355

350356
it 'should not process and log when Executor is not running' do
351357
@event_processor = Optimizely::BatchEventProcessor.new(
352358
event_dispatcher: @event_dispatcher,
353359
batch_size: 100,
354-
flush_interval: 100_000,
360+
flush_interval: 10_000,
355361
logger: spy_logger
356362
)
357363

364+
@event_processor.start!
358365
@event_processor.stop!
359366

360367
user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)

spec/project_spec.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2814,6 +2814,8 @@ def callback(_args); end
28142814
project_instance = Optimizely::Project.new(nil, nil, nil, nil, true, nil, nil, config_manager, nil, event_processor)
28152815

28162816
expect(config_manager.stopped).to be false
2817+
expect(event_processor.started).to be false
2818+
event_processor.start!
28172819
expect(event_processor.started).to be true
28182820

28192821
project_instance.close

0 commit comments

Comments
 (0)