Skip to content

Commit 6e84924

Browse files
oakbaniMichael Ng
authored andcommitted
refact: Batch Event Processor and Unit Tests (#215)
1 parent 5d3c74d commit 6e84924

File tree

2 files changed

+165
-144
lines changed

2 files changed

+165
-144
lines changed

lib/optimizely/event/batch_event_processor.rb

Lines changed: 22 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
module Optimizely
2121
class BatchEventProcessor < EventProcessor
2222
# BatchEventProcessor is a batched implementation of the Interface EventProcessor.
23-
# Events passed to the BatchEventProcessor are immediately added to a EventQueue.
23+
# Events passed to the BatchEventProcessor are immediately added to an EventQueue.
2424
# The BatchEventProcessor maintains a single consumer thread that pulls events off of
2525
# the BlockingQueue and buffers them for either a configured batch size or for a
2626
# maximum duration before the resulting LogEvent is sent to the NotificationCenter.
@@ -30,6 +30,7 @@ class BatchEventProcessor < EventProcessor
3030
DEFAULT_BATCH_SIZE = 10
3131
DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds
3232
DEFAULT_QUEUE_CAPACITY = 1000
33+
DEFAULT_TIMEOUT_INTERVAL = 5 # interval in seconds
3334

3435
FLUSH_SIGNAL = 'FLUSH_SIGNAL'
3536
SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL'
@@ -58,8 +59,6 @@ def initialize(
5859
DEFAULT_BATCH_INTERVAL
5960
end
6061
@notification_center = notification_center
61-
@mutex = Mutex.new
62-
@received = ConditionVariable.new
6362
@current_batch = []
6463
@started = false
6564
start!
@@ -71,15 +70,13 @@ def start!
7170
return
7271
end
7372
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
73+
@logger.log(Logger::INFO, 'Starting scheduler.')
7474
@thread = Thread.new { run }
7575
@started = true
7676
end
7777

7878
def flush
79-
@mutex.synchronize do
80-
@event_queue << FLUSH_SIGNAL
81-
@received.signal
82-
end
79+
@event_queue << FLUSH_SIGNAL
8380
end
8481

8582
def process(user_event)
@@ -90,56 +87,38 @@ def process(user_event)
9087
return
9188
end
9289

93-
@mutex.synchronize do
94-
begin
95-
@event_queue << user_event
96-
@received.signal
97-
rescue Exception
98-
@logger.log(Logger::WARN, 'Payload not accepted by the queue.')
99-
return
100-
end
90+
begin
91+
@event_queue.push(user_event, true)
92+
rescue Exception
93+
@logger.log(Logger::WARN, 'Payload not accepted by the queue.')
94+
return
10195
end
10296
end
10397

10498
def stop!
10599
return unless @started
106100

107-
@mutex.synchronize do
108-
@event_queue << SHUTDOWN_SIGNAL
109-
@received.signal
110-
end
111-
101+
@logger.log(Logger::INFO, 'Stopping scheduler.')
102+
@event_queue << SHUTDOWN_SIGNAL
103+
@thread.join(DEFAULT_TIMEOUT_INTERVAL)
112104
@started = false
113-
@logger.log(Logger::WARN, 'Stopping scheduler.')
114-
@thread.exit
115105
end
116106

117107
private
118108

119109
def run
120110
loop do
121-
if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline
122-
@logger.log(
123-
Logger::DEBUG,
124-
'Deadline exceeded flushing current batch.'
125-
)
126-
flush_queue!
127-
end
128-
129-
item = nil
111+
flush_queue! if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline
130112

131-
@mutex.synchronize do
132-
@received.wait(@mutex, 0.05)
133-
item = @event_queue.pop if @event_queue.length.positive?
134-
end
113+
item = @event_queue.pop if @event_queue.length.positive?
135114

136115
if item.nil?
137116
sleep(0.05)
138117
next
139118
end
140119

141120
if item == SHUTDOWN_SIGNAL
142-
@logger.log(Logger::INFO, 'Received shutdown signal.')
121+
@logger.log(Logger::DEBUG, 'Received shutdown signal.')
143122
break
144123
end
145124

@@ -152,7 +131,7 @@ def run
152131
add_to_batch(item) if item.is_a? Optimizely::UserEvent
153132
end
154133
rescue SignalException
155-
@logger.log(Logger::INFO, 'Interrupted while processing buffer.')
134+
@logger.log(Logger::ERROR, 'Interrupted while processing buffer.')
156135
rescue Exception => e
157136
@logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}")
158137
ensure
@@ -168,6 +147,11 @@ def flush_queue!
168147

169148
log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger)
170149
begin
150+
@logger.log(
151+
Logger::INFO,
152+
'Flushing Queue.'
153+
)
154+
171155
@event_dispatcher.dispatch_event(log_event)
172156
@notification_center&.send_notifications(
173157
NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
@@ -192,7 +176,7 @@ def add_to_batch(user_event)
192176
@current_batch << user_event
193177
return unless @current_batch.length >= @batch_size
194178

195-
@logger.log(Logger::DEBUG, 'Flushing on max batch size!')
179+
@logger.log(Logger::DEBUG, 'Flushing on max batch size.')
196180
flush_queue!
197181
end
198182

0 commit comments

Comments
 (0)