Skip to content

Commit 31e93c7

Browse files
rashidspmjc1283
authored andcommitted
fix: Event processor fixes and updated config-manager polling interval (#200)
Summary: - Set default Event Dispatcher In case of nil. - Log message for flush_interval validation. - Flush pending events when stop is called. - HTTPProjectConfigManager updates polling_interval validation. Test plan: - Added/updated unit tests.
1 parent a34512a commit 31e93c7

File tree

4 files changed

+58
-12
lines changed

4 files changed

+58
-12
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ The `sdk_key` is used to compose the outbound HTTP request to the default datafi
9898
You can provide an initial datafile to bootstrap the `DataFileProjectConfig` so that it can be used immediately. The initial datafile also serves as a fallback datafile if HTTP connection cannot be established. The initial datafile will be discarded after the first successful datafile poll.
9999
100100
**polling_interval**
101-
The polling interval is used to specify a fixed delay between consecutive HTTP requests for the datafile. Valid duration is between 1 and 2592000 seconds. Default is 5 minutes.
101+
The polling interval is used to specify a fixed delay between consecutive HTTP requests for the datafile. Valid duration is greater than 0 and less than 2592000 seconds. Default is 5 minutes.
102102
103103
**url_template**
104104
A string with placeholder `{sdk_key}` can be provided so that this template along with the provided `sdk_key` is used to form the target URL.

lib/optimizely/config_manager/http_project_config_manager.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ def polling_interval(polling_interval)
210210
return
211211
end
212212

213-
unless polling_interval.is_a? Integer
213+
unless polling_interval.is_a? Numeric
214214
@logger.log(
215215
Logger::ERROR,
216216
"Polling interval '#{polling_interval}' has invalid type. Defaulting to #{Helpers::Constants::CONFIG_MANAGER['DEFAULT_UPDATE_INTERVAL']} seconds."
@@ -219,7 +219,7 @@ def polling_interval(polling_interval)
219219
return
220220
end
221221

222-
unless polling_interval.between?(Helpers::Constants::CONFIG_MANAGER['MIN_SECONDS_LIMIT'], Helpers::Constants::CONFIG_MANAGER['MAX_SECONDS_LIMIT'])
222+
unless polling_interval.positive? && polling_interval <= Helpers::Constants::CONFIG_MANAGER['MAX_SECONDS_LIMIT']
223223
@logger.log(
224224
Logger::DEBUG,
225225
"Polling interval '#{polling_interval}' has invalid range. Defaulting to #{Helpers::Constants::CONFIG_MANAGER['DEFAULT_UPDATE_INTERVAL']} seconds."

lib/optimizely/event/batch_event_processor.rb

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class BatchEventProcessor < EventProcessor
2525
# the BlockingQueue and buffers them for either a configured batch size or for a
2626
# maximum duration before the resulting LogEvent is sent to the NotificationCenter.
2727

28-
attr_reader :event_queue, :current_batch, :started, :batch_size, :flush_interval
28+
attr_reader :event_queue, :event_dispatcher, :current_batch, :started, :batch_size, :flush_interval
2929

3030
DEFAULT_BATCH_SIZE = 10
3131
DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds
@@ -36,7 +36,7 @@ class BatchEventProcessor < EventProcessor
3636

3737
def initialize(
3838
event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY),
39-
event_dispatcher:,
39+
event_dispatcher: Optimizely::EventDispatcher.new,
4040
batch_size: DEFAULT_BATCH_SIZE,
4141
flush_interval: DEFAULT_BATCH_INTERVAL,
4242
logger: NoOpLogger.new,
@@ -51,7 +51,12 @@ def initialize(
5151
@logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.")
5252
DEFAULT_BATCH_SIZE
5353
end
54-
@flush_interval = positive_number?(flush_interval) ? flush_interval : DEFAULT_BATCH_INTERVAL
54+
@flush_interval = if positive_number?(flush_interval)
55+
flush_interval
56+
else
57+
@logger.log(Logger::DEBUG, "Setting to default flush_interval: #{DEFAULT_BATCH_INTERVAL} ms.")
58+
DEFAULT_BATCH_INTERVAL
59+
end
5560
@notification_center = notification_center
5661
@mutex = Mutex.new
5762
@received = ConditionVariable.new
@@ -146,6 +151,16 @@ def run
146151

147152
add_to_batch(item) if item.is_a? Optimizely::UserEvent
148153
end
154+
rescue SignalException
155+
@logger.log(Logger::INFO, 'Interrupted while processing buffer.')
156+
rescue Exception => e
157+
@logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}")
158+
ensure
159+
@logger.log(
160+
Logger::INFO,
161+
'Exiting processing loop. Attempting to flush pending events.'
162+
)
163+
flush_queue!
149164
end
150165

151166
def flush_queue!

spec/event/batch_event_processor_spec.rb

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,10 @@
110110
expected_batch.pop # Removes 11th element
111111
expect(@event_processor.current_batch.size).to be 10
112112

113-
expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).once
113+
expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).twice
114114
expect(@event_dispatcher).to have_received(:dispatch_event).with(
115115
Optimizely::EventFactory.create_log_event(expected_batch, spy_logger)
116-
).once
116+
).twice
117117
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once
118118
end
119119

@@ -239,18 +239,19 @@
239239
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher)
240240
expect(event_processor.flush_interval).to eq(30_000)
241241
event_processor.stop!
242-
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test')
242+
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test', logger: spy_logger)
243243
expect(event_processor.flush_interval).to eq(30_000)
244244
event_processor.stop!
245-
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: [])
245+
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: [], logger: spy_logger)
246246
expect(event_processor.flush_interval).to eq(30_000)
247247
event_processor.stop!
248-
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0)
248+
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0, logger: spy_logger)
249249
expect(event_processor.flush_interval).to eq(30_000)
250250
event_processor.stop!
251-
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5)
251+
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5, logger: spy_logger)
252252
expect(event_processor.flush_interval).to eq(30_000)
253253
event_processor.stop!
254+
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default flush_interval: 30000 ms.').exactly(4).times
254255
end
255256

256257
it 'should set flush interval when provided valid' do
@@ -295,4 +296,34 @@
295296
"Error dispatching event: #{log_event} Timeout::Error."
296297
)
297298
end
299+
300+
it 'should flush pending events when stop is called' do
301+
allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args)
302+
expected_batch = []
303+
counter = 0
304+
until counter >= 10
305+
event['key'] = event['key'] + counter.to_s
306+
user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
307+
expected_batch << user_event
308+
@event_processor.process(user_event)
309+
counter += 1
310+
end
311+
312+
sleep 0.25
313+
314+
# max batch size not occurred and batch is not dispatched.
315+
expect(@event_processor.current_batch.size).to be < 10
316+
expect(@event_dispatcher).not_to have_received(:dispatch_event)
317+
318+
# Stop should flush the queue!
319+
@event_processor.stop!
320+
sleep 0.75
321+
322+
expect(spy_logger).to have_received(:log).with(Logger::INFO, 'Exiting processing loop. Attempting to flush pending events.')
323+
expect(@event_dispatcher).to have_received(:dispatch_event).with(
324+
Optimizely::EventFactory.create_log_event(expected_batch, spy_logger)
325+
)
326+
327+
expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!')
328+
end
298329
end

0 commit comments

Comments
 (0)