Skip to content

Commit c6a2bfe

Browse files
rashidspMichael Ng
authored andcommitted
feat(eventProcessor): Add EventProcessor and BatchEventProcessor (#191)
1 parent 7accf32 commit c6a2bfe

File tree

7 files changed

+503
-8
lines changed

7 files changed

+503
-8
lines changed

.rubocop.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,6 @@ Style/RescueStandardError:
4444

4545
Style/SignalException:
4646
Enabled: false
47+
48+
Lint/RescueException:
49+
Enabled: false

.rubocop_todo.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ Lint/LiteralAsCondition:
2121
Metrics/ParameterLists:
2222
Max: 6
2323
Exclude:
24-
- 'lib/optimizely/config_manager/http_project_config_manager.rb'
2524
- 'lib/optimizely.rb'
26-
- 'lib/optimizely/optimizely_factory.rb'
27-
- 'lib/optimizely/event/entity/impression_event.rb'
28-
- 'lib/optimizely/event/entity/snapshot_event.rb'
25+
- 'lib/optimizely/config_manager/http_project_config_manager.rb'
26+
- 'lib/optimizely/event/batch_event_processor.rb'
2927
- 'lib/optimizely/event/entity/conversion_event.rb'
3028
- 'lib/optimizely/event/entity/event_context.rb'
29+
- 'lib/optimizely/event/entity/impression_event.rb'
30+
- 'lib/optimizely/event/entity/snapshot_event.rb'
31+
- 'lib/optimizely/optimizely_factory.rb'
3132

3233
Naming/AccessorMethodName:
3334
Exclude:
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
# frozen_string_literal: true
2+
3+
#
4+
# Copyright 2019, Optimizely and contributors
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
require_relative 'event_processor'
19+
require_relative '../helpers/validator'
20+
module Optimizely
21+
class BatchEventProcessor < EventProcessor
22+
# BatchEventProcessor is a batched implementation of the Interface EventProcessor.
23+
# Events passed to the BatchEventProcessor are immediately added to a EventQueue.
24+
# The BatchEventProcessor maintains a single consumer thread that pulls events off of
25+
# the BlockingQueue and buffers them for either a configured batch size or for a
26+
# maximum duration before the resulting LogEvent is sent to the NotificationCenter.
27+
28+
attr_reader :event_queue, :current_batch, :batch_size, :flush_interval
29+
30+
DEFAULT_BATCH_SIZE = 10
31+
DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds
32+
DEFAULT_QUEUE_CAPACITY = 1000
33+
34+
FLUSH_SIGNAL = 'FLUSH_SIGNAL'
35+
SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL'
36+
37+
def initialize(
38+
event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY),
39+
event_dispatcher:,
40+
batch_size: DEFAULT_BATCH_SIZE,
41+
flush_interval: DEFAULT_BATCH_INTERVAL,
42+
logger: NoOpLogger.new
43+
)
44+
@event_queue = event_queue
45+
@logger = logger
46+
@event_dispatcher = event_dispatcher
47+
@batch_size = if (batch_size.is_a? Integer) && positive_number?(batch_size)
48+
batch_size
49+
else
50+
@logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.")
51+
DEFAULT_BATCH_SIZE
52+
end
53+
@flush_interval = positive_number?(flush_interval) ? flush_interval : DEFAULT_BATCH_INTERVAL
54+
@mutex = Mutex.new
55+
@received = ConditionVariable.new
56+
@current_batch = []
57+
@is_started = false
58+
start!
59+
end
60+
61+
def start!
62+
if @is_started == true
63+
@logger.log(Logger::WARN, 'Service already started.')
64+
return
65+
end
66+
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
67+
@thread = Thread.new { run }
68+
@is_started = true
69+
end
70+
71+
def flush
72+
@mutex.synchronize do
73+
@event_queue << FLUSH_SIGNAL
74+
@received.signal
75+
end
76+
end
77+
78+
def process(user_event)
79+
@logger.log(Logger::DEBUG, "Received userEvent: #{user_event}")
80+
81+
unless @thread.alive?
82+
@logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.')
83+
return
84+
end
85+
86+
@mutex.synchronize do
87+
begin
88+
@event_queue << user_event
89+
@received.signal
90+
rescue Exception
91+
@logger.log(Logger::WARN, 'Payload not accepted by the queue.')
92+
return
93+
end
94+
end
95+
end
96+
97+
def stop!
98+
return unless @thread.alive?
99+
100+
@mutex.synchronize do
101+
@event_queue << SHUTDOWN_SIGNAL
102+
@received.signal
103+
end
104+
105+
@is_started = false
106+
@logger.log(Logger::WARN, 'Stopping scheduler.')
107+
@thread.exit
108+
end
109+
110+
private
111+
112+
def run
113+
loop do
114+
if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline
115+
@logger.log(
116+
Logger::DEBUG,
117+
'Deadline exceeded flushing current batch.'
118+
)
119+
flush_queue!
120+
end
121+
122+
item = nil
123+
124+
@mutex.synchronize do
125+
@received.wait(@mutex, 0.05)
126+
item = @event_queue.pop if @event_queue.length.positive?
127+
end
128+
129+
if item.nil?
130+
sleep(0.05)
131+
next
132+
end
133+
134+
if item == SHUTDOWN_SIGNAL
135+
@logger.log(Logger::INFO, 'Received shutdown signal.')
136+
break
137+
end
138+
139+
if item == FLUSH_SIGNAL
140+
@logger.log(Logger::DEBUG, 'Received flush signal.')
141+
flush_queue!
142+
next
143+
end
144+
145+
add_to_batch(item) if item.is_a? Optimizely::UserEvent
146+
end
147+
end
148+
149+
def flush_queue!
150+
return if @current_batch.empty?
151+
152+
log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger)
153+
begin
154+
@event_dispatcher.dispatch_event(log_event)
155+
rescue StandardError => e
156+
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}")
157+
end
158+
@current_batch = []
159+
end
160+
161+
def add_to_batch(user_event)
162+
if should_split?(user_event)
163+
flush_queue!
164+
@current_batch = []
165+
end
166+
167+
# Reset the deadline if starting a new batch.
168+
@flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty?
169+
170+
@logger.log(Logger::DEBUG, "Adding user event: #{user_event.event['key']} to batch.")
171+
@current_batch << user_event
172+
return unless @current_batch.length >= @batch_size
173+
174+
@logger.log(Logger::DEBUG, 'Flushing on max batch size!')
175+
flush_queue!
176+
end
177+
178+
def should_split?(user_event)
179+
return false if @current_batch.empty?
180+
181+
current_context = @current_batch.last.event_context
182+
new_context = user_event.event_context
183+
184+
# Revisions should match
185+
unless current_context[:revision] == new_context[:revision]
186+
@logger.log(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.')
187+
return true
188+
end
189+
190+
# Projects should match
191+
unless current_context[:project_id] == new_context[:project_id]
192+
@logger.log(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.')
193+
return true
194+
end
195+
false
196+
end
197+
198+
def positive_number?(value)
199+
# Returns true if the given value is positive finite number.
200+
# false otherwise.
201+
Helpers::Validator.finite_number?(value) && value.positive?
202+
end
203+
end
204+
end

lib/optimizely/event/event_factory.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def build_attribute_list(user_attributes, project_config)
8787
)
8888
end
8989

90-
return unless Helpers::Validator.boolean? project_config.bot_filtering
90+
return visitor_attributes unless Helpers::Validator.boolean? project_config.bot_filtering
9191

9292
# Append Bot Filtering Attribute
9393
visitor_attributes.push(
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# frozen_string_literal: true
2+
3+
#
4+
# Copyright 2019, Optimizely and contributors
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
module Optimizely
19+
class EventProcessor
20+
# EventProcessor interface is used to provide an intermediary processing stage within
21+
# event production. It's assumed that the EventProcessor dispatches events via a provided
22+
# EventDispatcher.
23+
def process(user_event); end
24+
end
25+
end

lib/optimizely/helpers/date_time_utils.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ module DateTimeUtils
2121
module_function
2222

2323
def create_timestamp
24-
# Returns Integer Current timestamp
25-
26-
(Time.now.to_f * 1000).to_i
24+
# Returns Integer current UTC timestamp
25+
utc = Time.now.getutc
26+
(utc.to_f * 1000).to_i
2727
end
2828
end
2929
end

0 commit comments

Comments
 (0)