Skip to content

Commit 2df63ad

Browse files
msohailhussainMichael Ng
authored andcommitted
feat(integrateep): Integrate Event processor (#194)
1 parent c6a2bfe commit 2df63ad

File tree

7 files changed

+333
-49
lines changed

7 files changed

+333
-49
lines changed

lib/optimizely.rb

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
require_relative 'optimizely/decision_service'
2323
require_relative 'optimizely/error_handler'
2424
require_relative 'optimizely/event_builder'
25+
require_relative 'optimizely/event/forwarding_event_processor'
26+
require_relative 'optimizely/event/event_factory'
27+
require_relative 'optimizely/event/user_event_factory'
2528
require_relative 'optimizely/event_dispatcher'
2629
require_relative 'optimizely/exceptions'
2730
require_relative 'optimizely/helpers/constants'
@@ -35,8 +38,8 @@ module Optimizely
3538
class Project
3639
attr_reader :notification_center
3740
# @api no-doc
38-
attr_reader :config_manager, :decision_service, :error_handler,
39-
:event_builder, :event_dispatcher, :logger
41+
attr_reader :config_manager, :decision_service, :error_handler, :event_dispatcher,
42+
:event_processor, :logger, :stopped
4043

4144
# Constructor for Projects.
4245
#
@@ -51,6 +54,7 @@ class Project
5154
# Must provide at least one of datafile or sdk_key.
5255
# @param config_manager - Optional Responds to get_config.
5356
# @param notification_center - Optional Instance of NotificationCenter.
57+
# @param event_processor - Optional Responds to process.
5458

5559
def initialize(
5660
datafile = nil,
@@ -61,7 +65,8 @@ def initialize(
6165
user_profile_service = nil,
6266
sdk_key = nil,
6367
config_manager = nil,
64-
notification_center = nil
68+
notification_center = nil,
69+
event_processor = nil
6570
)
6671
@logger = logger || NoOpLogger.new
6772
@error_handler = error_handler || NoOpErrorHandler.new
@@ -91,8 +96,14 @@ def initialize(
9196
else
9297
StaticProjectConfigManager.new(datafile, @logger, @error_handler, skip_json_validation)
9398
end
99+
94100
@decision_service = DecisionService.new(@logger, @user_profile_service)
95-
@event_builder = EventBuilder.new(@logger)
101+
102+
@event_processor = if event_processor.respond_to?(:process)
103+
event_processor
104+
else
105+
ForwardingEventProcessor.new(@event_dispatcher, @logger)
106+
end
96107
end
97108

98109
# Buckets visitor and sends impression event to Optimizely.
@@ -243,19 +254,14 @@ def track(event_key, user_id, attributes = nil, event_tags = nil)
243254
return nil
244255
end
245256

246-
conversion_event = @event_builder.create_conversion_event(config, event, user_id, attributes, event_tags)
257+
user_event = UserEventFactory.create_conversion_event(config, event, user_id, attributes, event_tags)
258+
@event_processor.process(user_event)
247259
@logger.log(Logger::INFO, "Tracking event '#{event_key}' for user '#{user_id}'.")
248-
@logger.log(Logger::INFO,
249-
"Dispatching conversion event to URL #{conversion_event.url} with params #{conversion_event.params}.")
250-
begin
251-
@event_dispatcher.dispatch_event(conversion_event)
252-
rescue => e
253-
@logger.log(Logger::ERROR, "Unable to dispatch conversion event. Error: #{e}")
254-
end
255260

261+
log_event = EventFactory.create_log_event(user_event, @logger)
256262
@notification_center.send_notifications(
257263
NotificationCenter::NOTIFICATION_TYPES[:TRACK],
258-
event_key, user_id, attributes, event_tags, conversion_event
264+
event_key, user_id, attributes, event_tags, log_event
259265
)
260266
nil
261267
end
@@ -507,6 +513,14 @@ def is_valid
507513
config.is_a?(Optimizely::ProjectConfig)
508514
end
509515

516+
def close
517+
return if @stopped
518+
519+
@stopped = true
520+
@config_manager.stop! if @config_manager.respond_to?(:stop!)
521+
@event_processor.stop! if @event_processor.respond_to?(:stop!)
522+
end
523+
510524
private
511525

512526
def get_variation_with_config(experiment_key, user_id, attributes, config)
@@ -692,18 +706,15 @@ def validate_instantiation_options
692706
def send_impression(config, experiment, variation_key, user_id, attributes = nil)
693707
experiment_key = experiment['key']
694708
variation_id = config.get_variation_id_from_key(experiment_key, variation_key)
695-
impression_event = @event_builder.create_impression_event(config, experiment, variation_id, user_id, attributes)
696-
@logger.log(Logger::INFO,
697-
"Dispatching impression event to URL #{impression_event.url} with params #{impression_event.params}.")
698-
begin
699-
@event_dispatcher.dispatch_event(impression_event)
700-
rescue => e
701-
@logger.log(Logger::ERROR, "Unable to dispatch impression event. Error: #{e}")
702-
end
709+
user_event = UserEventFactory.create_impression_event(config, experiment, variation_id, user_id, attributes)
710+
@event_processor.process(user_event)
711+
712+
@logger.log(Logger::INFO, "Activating user '#{user_id}' in experiment '#{experiment_key}'.")
703713
variation = config.get_variation_from_id(experiment_key, variation_id)
714+
log_event = EventFactory.create_log_event(user_event, @logger)
704715
@notification_center.send_notifications(
705716
NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
706-
experiment, user_id, attributes, variation, impression_event
717+
experiment, user_id, attributes, variation, log_event
707718
)
708719
end
709720

lib/optimizely/config_manager/http_project_config_manager.rb

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ module Optimizely
3030
class HTTPProjectConfigManager < ProjectConfigManager
3131
# Config manager that polls for the datafile and updated ProjectConfig based on an update interval.
3232

33-
attr_reader :config
33+
attr_reader :config, :closed
3434

3535
# Initialize config manager. One of sdk_key or url has to be set to be able to use.
3636
#
@@ -72,6 +72,7 @@ def initialize(
7272
@last_modified = nil
7373
@async_scheduler = AsyncScheduler.new(method(:fetch_datafile_config), @polling_interval, auto_update, @logger)
7474
@async_scheduler.start! if start_by_default == true
75+
@closed = false
7576
@skip_json_validation = skip_json_validation
7677
@notification_center = notification_center.is_a?(Optimizely::NotificationCenter) ? notification_center : NotificationCenter.new(@logger, @error_handler)
7778
@config = datafile.nil? ? nil : DatafileProjectConfig.create(datafile, @logger, @error_handler, @skip_json_validation)
@@ -84,11 +85,24 @@ def ready?
8485
end
8586

8687
def start!
88+
if @closed
89+
@logger.log(Logger::WARN, 'Not starting. Already closed.')
90+
return
91+
end
92+
8793
@async_scheduler.start!
94+
@closed = false
8895
end
8996

9097
def stop!
98+
if @closed
99+
@logger.log(Logger::WARN, 'Not pausing. Manager has not been started.')
100+
return
101+
end
102+
91103
@async_scheduler.stop!
104+
@config = nil
105+
@closed = true
92106
end
93107

94108
def get_config

lib/optimizely/event/batch_event_processor.rb

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class BatchEventProcessor < EventProcessor
2525
# the BlockingQueue and buffers them for either a configured batch size or for a
2626
# maximum duration before the resulting LogEvent is sent to the NotificationCenter.
2727

28-
attr_reader :event_queue, :current_batch, :batch_size, :flush_interval
28+
attr_reader :event_queue, :current_batch, :started, :batch_size, :flush_interval
2929

3030
DEFAULT_BATCH_SIZE = 10
3131
DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds
@@ -54,18 +54,18 @@ def initialize(
5454
@mutex = Mutex.new
5555
@received = ConditionVariable.new
5656
@current_batch = []
57-
@is_started = false
57+
@started = false
5858
start!
5959
end
6060

6161
def start!
62-
if @is_started == true
62+
if @started == true
6363
@logger.log(Logger::WARN, 'Service already started.')
6464
return
6565
end
6666
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
6767
@thread = Thread.new { run }
68-
@is_started = true
68+
@started = true
6969
end
7070

7171
def flush
@@ -78,7 +78,7 @@ def flush
7878
def process(user_event)
7979
@logger.log(Logger::DEBUG, "Received userEvent: #{user_event}")
8080

81-
unless @thread.alive?
81+
if !@started || !@thread.alive?
8282
@logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.')
8383
return
8484
end
@@ -95,14 +95,14 @@ def process(user_event)
9595
end
9696

9797
def stop!
98-
return unless @thread.alive?
98+
return unless @started
9999

100100
@mutex.synchronize do
101101
@event_queue << SHUTDOWN_SIGNAL
102102
@received.signal
103103
end
104104

105-
@is_started = false
105+
@started = false
106106
@logger.log(Logger::WARN, 'Stopping scheduler.')
107107
@thread.exit
108108
end
@@ -153,7 +153,7 @@ def flush_queue!
153153
begin
154154
@event_dispatcher.dispatch_event(log_event)
155155
rescue StandardError => e
156-
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}")
156+
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
157157
end
158158
@current_batch = []
159159
end
@@ -167,7 +167,7 @@ def add_to_batch(user_event)
167167
# Reset the deadline if starting a new batch.
168168
@flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty?
169169

170-
@logger.log(Logger::DEBUG, "Adding user event: #{user_event.event['key']} to batch.")
170+
@logger.log(Logger::DEBUG, "Adding user event: #{user_event} to batch.")
171171
@current_batch << user_event
172172
return unless @current_batch.length >= @batch_size
173173

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# frozen_string_literal: true
2+
3+
#
4+
# Copyright 2019, Optimizely and contributors
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
require_relative 'event_processor'
19+
module Optimizely
20+
class ForwardingEventProcessor < EventProcessor
21+
# ForwardingEventProcessor is a basic transformation stage for converting
22+
# the event batch into a LogEvent to be dispatched.
23+
def initialize(event_dispatcher, logger = nil)
24+
@event_dispatcher = event_dispatcher
25+
@logger = logger || NoOpLogger.new
26+
end
27+
28+
def process(user_event)
29+
log_event = Optimizely::EventFactory.create_log_event(user_event, @logger)
30+
31+
begin
32+
@event_dispatcher.dispatch_event(log_event)
33+
rescue StandardError => e
34+
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
35+
end
36+
end
37+
end
38+
end

spec/event/batch_event_processor_spec.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@
111111
expect(@event_dispatcher).to have_received(:dispatch_event).with(
112112
Optimizely::EventFactory.create_log_event(expected_batch, spy_logger)
113113
).once
114-
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, "Adding user event: #{event['key']} to batch.").exactly(10).times
115114
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once
116115
end
117116

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# frozen_string_literal: true
2+
3+
#
4+
# Copyright 2019, Optimizely and contributors
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
require 'spec_helper'
19+
require 'optimizely/event/forwarding_event_processor'
20+
require 'optimizely/event/user_event_factory'
21+
require 'optimizely/error_handler'
22+
require 'optimizely/helpers/date_time_utils'
23+
require 'optimizely/logger'
24+
describe Optimizely::ForwardingEventProcessor do
25+
let(:config_body_JSON) { OptimizelySpec::VALID_CONFIG_BODY_JSON }
26+
let(:error_handler) { Optimizely::NoOpErrorHandler.new }
27+
let(:spy_logger) { spy('logger') }
28+
let(:project_config) { Optimizely::DatafileProjectConfig.new(config_body_JSON, spy_logger, error_handler) }
29+
let(:event) { project_config.get_event_from_key('test_event') }
30+
let(:log_url) { 'https://logx.optimizely.com/v1/events' }
31+
let(:post_headers) { {'Content-Type' => 'application/json'} }
32+
33+
before(:example) do
34+
time_now = Time.now
35+
allow(Time).to receive(:now).and_return(time_now)
36+
allow(SecureRandom).to receive(:uuid).and_return('a68cf1ad-0393-4e18-af87-efe8f01a7c9c')
37+
38+
@event_dispatcher = Optimizely::EventDispatcher.new
39+
allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))
40+
@conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
41+
42+
@expected_endpoint = 'https://logx.optimizely.com/v1/events'
43+
@expected_conversion_params = {
44+
account_id: '12001',
45+
project_id: '111001',
46+
visitors: [{
47+
attributes: [{
48+
entity_id: Optimizely::Helpers::Constants::CONTROL_ATTRIBUTES['BOT_FILTERING'],
49+
key: Optimizely::Helpers::Constants::CONTROL_ATTRIBUTES['BOT_FILTERING'],
50+
type: 'custom',
51+
value: true
52+
}],
53+
visitor_id: 'test_user',
54+
snapshots: [{
55+
events: [{
56+
entity_id: '111095',
57+
timestamp: Optimizely::Helpers::DateTimeUtils.create_timestamp,
58+
uuid: 'a68cf1ad-0393-4e18-af87-efe8f01a7c9c',
59+
key: 'test_event'
60+
}]
61+
}]
62+
}],
63+
anonymize_ip: false,
64+
revision: '42',
65+
client_name: Optimizely::CLIENT_ENGINE,
66+
enrich_decisions: true,
67+
client_version: Optimizely::VERSION
68+
}
69+
end
70+
71+
describe '.process' do
72+
it 'should dispatch log event when valid event is provided' do
73+
forwarding_event_processor = Optimizely::ForwardingEventProcessor.new(
74+
@event_dispatcher, spy_logger
75+
)
76+
77+
forwarding_event_processor.process(@conversion_event)
78+
79+
expect(@event_dispatcher).to have_received(:dispatch_event).with(
80+
Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers)
81+
).once
82+
end
83+
84+
it 'should log an error when dispatch event raises timeout exception' do
85+
log_event = Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers)
86+
allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event)
87+
88+
timeout_error = Timeout::Error.new
89+
allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error)
90+
91+
forwarding_event_processor = Optimizely::ForwardingEventProcessor.new(
92+
@event_dispatcher, spy_logger
93+
)
94+
95+
forwarding_event_processor.process(@conversion_event)
96+
97+
expect(spy_logger).to have_received(:log).once.with(
98+
Logger::ERROR,
99+
"Error dispatching event: #{log_event} Timeout::Error."
100+
)
101+
end
102+
end
103+
end

0 commit comments

Comments
 (0)