Skip to content

Commit d5cfc8c

Browse files
authored
Merge pull request #529 from AmbireTech/improve-sentry-event-submission-performance
Improve sentry event submission performance
2 parents 96734ad + bdb7e04 commit d5cfc8c

File tree

13 files changed

+293
-192
lines changed

13 files changed

+293
-192
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

primitives/src/test_util.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,14 @@ mod logger {
149149
pub static DUMMY_VALIDATOR_LEADER: Lazy<ValidatorDesc> = Lazy::new(|| ValidatorDesc {
150150
id: IDS[&LEADER],
151151
url: "http://localhost:8005".to_string(),
152-
fee: UnifiedNum::from_whole(0.02),
152+
fee: UnifiedNum::from_whole(0.03),
153153
fee_addr: None,
154154
});
155155

156156
pub static DUMMY_VALIDATOR_FOLLOWER: Lazy<ValidatorDesc> = Lazy::new(|| ValidatorDesc {
157157
id: IDS[&FOLLOWER],
158158
url: "http://localhost:8006".to_string(),
159-
fee: UnifiedNum::from_whole(0.03),
159+
fee: UnifiedNum::from_whole(0.02),
160160
fee_addr: None,
161161
});
162162

sentry/Makefile.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ args = [
5252
"-t3",
5353
"-c100",
5454
"-d30s",
55-
"-R2000",
55+
"-R3000",
5656
"--latency",
5757
"http://127.0.0.1:8005/v5/campaign",
5858
]

sentry/benchmark/multiple_benchmark.lua

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
-- Multiple campaigns with a single event in the request
2+
--
13
-- This script will submit events for 3 campaigns
24
-- The 3 campaigns can be found in `primitives::test_util`
35
wrk.method = "POST"
@@ -7,7 +9,6 @@ wrk.headers["Content-Type"] = "application/json"
79
-- uses the DUMMY_AUTH[CREATOR] token
810
-- wrk.headers["authorization"] = "Bearer AUTH_awesomeCreator:chain_id:1337"
911

10-
1112
init = function(args)
1213
local r = {}
1314

sentry/benchmark/multiple_events.lua

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
-- Multiple events to multiple campaigns
2+
--
3+
-- This script will submit events for 3 campaigns
4+
-- The 3 campaigns can be found in `primitives::test_util`
5+
-- Each requests consist of 2 events - IMPRESSION & CLICK
6+
-- and each event has differnet publihser (PUBLIHSER & PUBLISHER_2) as well as different AdUnit & AdSlot
7+
-- The same events are used for all campaigns.
8+
wrk.method = "POST"
9+
-- uses the PUBLISHER (for IMPRESSION) & PUBLISHER_2 (for CLICK) address
10+
wrk.body = "{ \"events\": [ {\"type\": \"IMPRESSION\", \"publisher\": \"0xE882ebF439207a70dDcCb39E13CA8506c9F45fD9\", \"adUnit\": \"Qmasg8FrbuSQpjFu3kRnZF9beg8rEBFrqgi1uXDRwCbX5f\", \"adSlot\": \"QmcUVX7fvoLMM93uN2bD3wGTH8MXSxeL8hojYfL2Lhp7mR\"} ] }"
11+
wrk.headers["Content-Type"] = "application/json"
12+
-- uses the DUMMY_AUTH[CREATOR] token
13+
-- wrk.headers["authorization"] = "Bearer AUTH_awesomeCreator:chain_id:1337"
14+
15+
16+
init = function(args)
17+
local r = {}
18+
19+
-- with 2 different publishers (PUBLISHER, PUBLISHER_2), different AdUnits & different AdSlots
20+
local two_events_body = "{ \"events\": [ {\"type\": \"IMPRESSION\", \"publisher\": \"0xE882ebF439207a70dDcCb39E13CA8506c9F45fD9\", \"adUnit\": \"Qmasg8FrbuSQpjFu3kRnZF9beg8rEBFrqgi1uXDRwCbX5f\", \"adSlot\": \"QmcUVX7fvoLMM93uN2bD3wGTH8MXSxeL8hojYfL2Lhp7mR\"}, {\"type\": \"CLICK\", \"publisher\": \"0x0e880972A4b216906F05D67EeaaF55d16B5EE4F1\", \"adUnit\": \"QmQnu8zrHsuVvnTJsEgDHYA8c1MmRL7YLiMD8uzDUJKcNq\", \"adSlot\": \"QmYYBULc9QDEaDr8HAXvVWHDmFfL2GvyumYRr1g4ERBC96\"} ] }"
21+
22+
-- Campaign 1
23+
r[1] = wrk.format(nil, "/v5/campaign/0x936da01f9abd4d9d80c702af85c822a8/events", nil, two_events_body)
24+
-- Campaign 2
25+
r[2] = wrk.format(nil, "/v5/campaign/0x127b98248f4e4b73af409d10f62daeaa/events", nil, two_events_body)
26+
-- Campaign 3
27+
r[3] = wrk.format(nil, "/v5/campaign/0xa78f3492481b41a688488a7aa1ff17df/events", nil, two_events_body)
28+
29+
req = table.concat(r)
30+
end
31+
32+
request = function()
33+
return req
34+
end

sentry/migrations/20190806011140_initial-tables/up.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ CREATE TABLE accounting (
7272
side AccountingSide NOT NULL,
7373
"address" varchar(42) NOT NULL,
7474
amount bigint NOT NULL,
75-
updated timestamp(2) with time zone DEFAULT NULL NULL,
75+
updated timestamp(2) with time zone DEFAULT NULL,
7676
created timestamp(2) with time zone NOT NULL,
7777
-- Do not rename the Primary key constraint (`accounting_pkey`)!
7878
PRIMARY KEY (channel_id, side, "address"),

sentry/src/access.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub async fn check_access(
4242
return Err(Error::ForbiddenReferrer);
4343
}
4444

45+
// Used when `Campaign::event_submission` is `None`
4546
let default_rules = [
4647
Rule {
4748
uids: Some(vec![campaign.creator.to_string()]),
@@ -84,7 +85,7 @@ pub async fn check_access(
8485
}
8586

8687
async fn apply_rule(
87-
redis: MultiplexedConnection,
88+
mut redis: MultiplexedConnection,
8889
rule: &Rule,
8990
events: &[Event],
9091
campaign: &Campaign,
@@ -116,7 +117,7 @@ async fn apply_rule(
116117

117118
if redis::cmd("EXISTS")
118119
.arg(&key)
119-
.query_async::<_, i8>(&mut redis.clone())
120+
.query_async::<_, i8>(&mut redis)
120121
.await
121122
.map(|exists| exists == 1)
122123
.map_err(|error| format!("{}", error))?
@@ -129,7 +130,7 @@ async fn apply_rule(
129130
.arg(&key)
130131
.arg(seconds as i32)
131132
.arg("1")
132-
.query_async::<_, ()>(&mut redis.clone())
133+
.query_async::<_, ()>(&mut redis)
133134
.await
134135
.map_err(|error| format!("{}", error))
135136
}

sentry/src/analytics.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::{
22
db::{analytics::update_analytics, DbPool, PoolError},
33
Session,
44
};
5+
use futures::future::join_all;
56
use primitives::{
67
analytics::OperatingSystem,
78
sentry::{DateHour, Event, UpdateAnalytics},
@@ -14,7 +15,7 @@ pub async fn record(
1415
pool: &DbPool,
1516
campaign_context: &ChainOf<Campaign>,
1617
session: &Session,
17-
events_with_payouts: Vec<(Event, Address, UnifiedNum)>,
18+
events_with_payouts: &[(Event, Address, UnifiedNum)],
1819
) -> Result<(), PoolError> {
1920
let os_name = session
2021
.os
@@ -65,9 +66,9 @@ pub async fn record(
6566
};
6667

6768
batch_update
68-
.entry(event)
69+
.entry(event.clone())
6970
.and_modify(|analytics| {
70-
analytics.amount_to_add += &payout_amount;
71+
analytics.amount_to_add += payout_amount;
7172
analytics.count_to_add += 1;
7273
})
7374
.or_insert_with(|| UpdateAnalytics {
@@ -83,14 +84,24 @@ pub async fn record(
8384
os_name: os_name.clone(),
8485
chain_id: campaign_context.chain.chain_id,
8586
event_type,
86-
amount_to_add: payout_amount,
87+
amount_to_add: *payout_amount,
8788
count_to_add: 1,
8889
});
8990
}
9091

91-
for (_event, update) in batch_update.into_iter() {
92-
update_analytics(pool, update).await?;
93-
}
92+
let batch_futures = join_all(
93+
batch_update
94+
.into_iter()
95+
.map(|(_event, update)| update_analytics(pool, update)),
96+
);
97+
98+
// execute the batched futures, collect the result afterwards,
99+
// in order execute all futures first and then return an error if occurred
100+
batch_futures
101+
.await
102+
.into_iter()
103+
.collect::<Result<Vec<_>, _>>()?;
104+
94105
Ok(())
95106
}
96107

@@ -192,7 +203,7 @@ mod test {
192203
let channel_context = channel_chain.with_channel(dummy_channel);
193204
let campaign_context = channel_context.clone().with(campaign);
194205

195-
record(&app.pool, &campaign_context, &session, input_events.clone())
206+
record(&app.pool, &campaign_context, &session, &input_events)
196207
.await
197208
.expect("should record");
198209

@@ -256,7 +267,7 @@ mod test {
256267
.expect("Channel token should be whitelisted in config!");
257268
let channel_context = channel_chain.with_channel(dummy_channel);
258269
let campaign_context = channel_context.clone().with(campaign);
259-
record(&app.pool, &campaign_context, &session, input_events.clone())
270+
record(&app.pool, &campaign_context, &session, &input_events)
260271
.await
261272
.expect("should record");
262273

sentry/src/db/accounting.rs

Lines changed: 61 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
use std::fmt;
2+
13
use chrono::{DateTime, Utc};
4+
use futures::future::{join, join_all};
25
use primitives::{
36
balances::{Balances, CheckedState},
47
Address, ChannelId, UnifiedNum,
@@ -11,7 +14,7 @@ use tokio_postgres::{
1114
use super::{DbPool, PoolError};
1215
use thiserror::Error;
1316

14-
static UPDATE_ACCOUNTING_STATEMENT: &str = "INSERT INTO accounting(channel_id, side, address, amount, updated, created) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT accounting_pkey DO UPDATE SET amount = accounting.amount + $4, updated = $6 WHERE accounting.channel_id = $1 AND accounting.side = $2 AND accounting.address = $3 RETURNING channel_id, side, address, amount, updated, created";
17+
static UPDATE_ACCOUNTING_STATEMENT: &str = "INSERT INTO accounting(channel_id, side, address, amount, updated, created) VALUES($1, $2, $3, $4, NULL, NOW()) ON CONFLICT ON CONSTRAINT accounting_pkey DO UPDATE SET amount = accounting.amount + EXCLUDED.amount, updated = NOW() WHERE accounting.channel_id = $1 AND accounting.side = $2 AND accounting.address = $3 RETURNING channel_id, side, address, amount, updated, created";
1518

1619
#[derive(Debug, Error)]
1720
pub enum Error {
@@ -57,12 +60,16 @@ pub enum Side {
5760
Spender,
5861
}
5962

60-
pub enum SpendError {
61-
Pool(PoolError),
62-
NoRecordsUpdated,
63+
impl fmt::Display for Side {
64+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65+
match self {
66+
Side::Earner => write!(f, "Earner"),
67+
Side::Spender => write!(f, "Spender"),
68+
}
69+
}
6370
}
6471

65-
/// ```text
72+
/// ```sql
6673
/// SELECT channel_id, side, address, amount, updated, created FROM accounting WHERE channel_id = $1 AND address = $2 AND side = $3
6774
/// ```
6875
pub async fn get_accounting(
@@ -110,14 +117,8 @@ pub async fn update_accounting(
110117
let client = pool.get().await?;
111118
let statement = client.prepare(UPDATE_ACCOUNTING_STATEMENT).await?;
112119

113-
let now = Utc::now();
114-
let updated: Option<DateTime<Utc>> = None;
115-
116120
let row = client
117-
.query_one(
118-
&statement,
119-
&[&channel_id, &side, &address, &amount, &updated, &now],
120-
)
121+
.query_one(&statement, &[&channel_id, &side, &address, &amount])
121122
.await?;
122123

123124
Ok(Accounting::from(&row))
@@ -126,52 +127,63 @@ pub async fn update_accounting(
126127
/// `delta_balances` defines the Balances that need to be added to the spending or earnings of the `Accounting`s.
127128
/// It will **not** override the whole `Accounting` value
128129
/// Returns a tuple of `(Vec<Earners Accounting>, Vec<Spenders Accounting>)`
130+
///
131+
/// # Error
132+
///
133+
/// It will return an error if any of the updates fails but it would have updated the rest of them.
134+
///
135+
/// This way we ensure that even if a single or multiple Accounting updates fail,
136+
/// we will still pay out the rest of them.
129137
pub async fn spend_amount(
130138
pool: DbPool,
131139
channel_id: ChannelId,
132140
delta_balances: Balances<CheckedState>,
133141
) -> Result<(Vec<Accounting>, Vec<Accounting>), PoolError> {
134-
let client = pool.get().await?;
135-
136-
let statement = client.prepare(UPDATE_ACCOUNTING_STATEMENT).await?;
142+
let client = &pool.get().await?;
137143

138-
let now = Utc::now();
139-
let updated: Option<DateTime<Utc>> = None;
140-
141-
let (mut earners, mut spenders) = (vec![], vec![]);
144+
let statement = client.prepare_cached(UPDATE_ACCOUNTING_STATEMENT).await?;
142145

143146
// Earners
144-
for (earner, amount) in delta_balances.earners {
145-
let row = client
146-
.query_one(
147-
&statement,
148-
&[&channel_id, &Side::Earner, &earner, &amount, &updated, &now],
149-
)
150-
.await?;
147+
let earners_futures = delta_balances.earners.into_iter().map(|(earner, amount)| {
148+
let statement = statement.clone();
151149

152-
earners.push(Accounting::from(&row))
153-
}
150+
async move {
151+
client
152+
.query_one(&statement, &[&channel_id, &Side::Earner, &earner, &amount])
153+
.await
154+
.map(|row| Accounting::from(&row))
155+
}
156+
});
154157

155158
// Spenders
156-
for (spender, amount) in delta_balances.spenders {
157-
let row = client
158-
.query_one(
159-
&statement,
160-
&[
161-
&channel_id,
162-
&Side::Spender,
163-
&spender,
164-
&amount,
165-
&updated,
166-
&now,
167-
],
168-
)
169-
.await?;
159+
let spenders_futures = delta_balances
160+
.spenders
161+
.into_iter()
162+
.map(|(spender, amount)| {
163+
let statement = statement.clone();
164+
165+
async move {
166+
client
167+
.query_one(
168+
&statement,
169+
&[&channel_id, &Side::Spender, &spender, &amount],
170+
)
171+
.await
172+
.map(|row| Accounting::from(&row))
173+
}
174+
});
170175

171-
spenders.push(Accounting::from(&row))
172-
}
176+
let earners = join_all(earners_futures);
177+
let spenders = join_all(spenders_futures);
178+
179+
// collect all the Accounting updates into Vectors
180+
let (earners, spenders) = join(earners, spenders).await;
173181

174-
Ok((earners, spenders))
182+
// Return an error if any of the Accounting updates failed
183+
Ok((
184+
earners.into_iter().collect::<Result<_, _>>()?,
185+
spenders.into_iter().collect::<Result<_, _>>()?,
186+
))
175187
}
176188

177189
#[cfg(test)]
@@ -531,7 +543,7 @@ mod test {
531543
earners_acc
532544
.iter()
533545
.find(|a| a.address == earner)
534-
.unwrap()
546+
.expect("Should find Accounting")
535547
.clone(),
536548
false,
537549
);
@@ -540,7 +552,7 @@ mod test {
540552
earners_acc
541553
.iter()
542554
.find(|a| a.address == other_earner)
543-
.unwrap()
555+
.expect("Should find Accounting")
544556
.clone(),
545557
false,
546558
);
@@ -551,7 +563,7 @@ mod test {
551563
spenders_acc
552564
.iter()
553565
.find(|a| a.address == spender)
554-
.unwrap()
566+
.expect("Should find Accounting")
555567
.clone(),
556568
false,
557569
);
@@ -560,7 +572,7 @@ mod test {
560572
spenders_acc
561573
.iter()
562574
.find(|a| a.address == other_spender)
563-
.unwrap()
575+
.expect("Should find Accounting")
564576
.clone(),
565577
false,
566578
);

0 commit comments

Comments
 (0)