Skip to content

Commit 6aee22f

Browse files
committed
sentry - campaign - spending for events improvements
- Fix test after introducing spend_for_events - Improve analytics by spawning separate task to record them - analytics - fix bug with count - primitives - test_util - switch dummy leader & follower fees
1 parent 55ea459 commit 6aee22f

File tree

8 files changed

+232
-178
lines changed

8 files changed

+232
-178
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

primitives/src/test_util.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,14 @@ mod logger {
149149
pub static DUMMY_VALIDATOR_LEADER: Lazy<ValidatorDesc> = Lazy::new(|| ValidatorDesc {
150150
id: IDS[&LEADER],
151151
url: "http://localhost:8005".to_string(),
152-
fee: UnifiedNum::from_whole(0.02),
152+
fee: UnifiedNum::from_whole(0.03),
153153
fee_addr: None,
154154
});
155155

156156
pub static DUMMY_VALIDATOR_FOLLOWER: Lazy<ValidatorDesc> = Lazy::new(|| ValidatorDesc {
157157
id: IDS[&FOLLOWER],
158158
url: "http://localhost:8006".to_string(),
159-
fee: UnifiedNum::from_whole(0.03),
159+
fee: UnifiedNum::from_whole(0.02),
160160
fee_addr: None,
161161
});
162162

sentry/src/analytics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub async fn record(
6868
batch_update
6969
.entry(event.clone())
7070
.and_modify(|analytics| {
71-
analytics.amount_to_add += &payout_amount;
71+
analytics.amount_to_add += payout_amount;
7272
analytics.count_to_add += 1;
7373
})
7474
.or_insert_with(|| UpdateAnalytics {

sentry/src/db/accounting.rs

Lines changed: 61 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
use std::fmt;
2+
13
use chrono::{DateTime, Utc};
4+
use futures::future::{join, join_all};
25
use primitives::{
36
balances::{Balances, CheckedState},
47
Address, ChannelId, UnifiedNum,
@@ -11,7 +14,7 @@ use tokio_postgres::{
1114
use super::{DbPool, PoolError};
1215
use thiserror::Error;
1316

14-
static UPDATE_ACCOUNTING_STATEMENT: &str = "INSERT INTO accounting(channel_id, side, address, amount, updated, created) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT accounting_pkey DO UPDATE SET amount = accounting.amount + $4, updated = $6 WHERE accounting.channel_id = $1 AND accounting.side = $2 AND accounting.address = $3 RETURNING channel_id, side, address, amount, updated, created";
17+
static UPDATE_ACCOUNTING_STATEMENT: &str = "INSERT INTO accounting(channel_id, side, address, amount, updated, created) VALUES($1, $2, $3, $4, NULL, NOW()) ON CONFLICT ON CONSTRAINT accounting_pkey DO UPDATE SET amount = accounting.amount + EXCLUDED.amount, updated = NOW() WHERE accounting.channel_id = $1 AND accounting.side = $2 AND accounting.address = $3 RETURNING channel_id, side, address, amount, updated, created";
1518

1619
#[derive(Debug, Error)]
1720
pub enum Error {
@@ -57,12 +60,16 @@ pub enum Side {
5760
Spender,
5861
}
5962

60-
pub enum SpendError {
61-
Pool(PoolError),
62-
NoRecordsUpdated,
63+
impl fmt::Display for Side {
64+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65+
match self {
66+
Side::Earner => write!(f, "Earner"),
67+
Side::Spender => write!(f, "Spender"),
68+
}
69+
}
6370
}
6471

65-
/// ```text
72+
/// ```sql
6673
/// SELECT channel_id, side, address, amount, updated, created FROM accounting WHERE channel_id = $1 AND address = $2 AND side = $3
6774
/// ```
6875
pub async fn get_accounting(
@@ -110,14 +117,8 @@ pub async fn update_accounting(
110117
let client = pool.get().await?;
111118
let statement = client.prepare(UPDATE_ACCOUNTING_STATEMENT).await?;
112119

113-
let now = Utc::now();
114-
let updated: Option<DateTime<Utc>> = None;
115-
116120
let row = client
117-
.query_one(
118-
&statement,
119-
&[&channel_id, &side, &address, &amount, &updated, &now],
120-
)
121+
.query_one(&statement, &[&channel_id, &side, &address, &amount])
121122
.await?;
122123

123124
Ok(Accounting::from(&row))
@@ -126,52 +127,63 @@ pub async fn update_accounting(
126127
/// `delta_balances` defines the Balances that need to be added to the spending or earnings of the `Accounting`s.
127128
/// It will **not** override the whole `Accounting` value
128129
/// Returns a tuple of `(Vec<Earners Accounting>, Vec<Spenders Accounting>)`
130+
///
131+
/// # Error
132+
///
133+
/// It will return an error if any of the updates fails but it would have updated the rest of them.
134+
///
135+
/// This way we ensure that even if a single or multiple Accounting updates fail,
136+
/// we will still pay out the rest of them.
129137
pub async fn spend_amount(
130138
pool: DbPool,
131139
channel_id: ChannelId,
132140
delta_balances: Balances<CheckedState>,
133141
) -> Result<(Vec<Accounting>, Vec<Accounting>), PoolError> {
134-
let client = pool.get().await?;
135-
136-
let statement = client.prepare(UPDATE_ACCOUNTING_STATEMENT).await?;
142+
let client = &pool.get().await?;
137143

138-
let now = Utc::now();
139-
let updated: Option<DateTime<Utc>> = None;
140-
141-
let (mut earners, mut spenders) = (vec![], vec![]);
144+
let statement = client.prepare_cached(UPDATE_ACCOUNTING_STATEMENT).await?;
142145

143146
// Earners
144-
for (earner, amount) in delta_balances.earners {
145-
let row = client
146-
.query_one(
147-
&statement,
148-
&[&channel_id, &Side::Earner, &earner, &amount, &updated, &now],
149-
)
150-
.await?;
147+
let earners_futures = delta_balances.earners.into_iter().map(|(earner, amount)| {
148+
let statement = statement.clone();
151149

152-
earners.push(Accounting::from(&row))
153-
}
150+
async move {
151+
client
152+
.query_one(&statement, &[&channel_id, &Side::Earner, &earner, &amount])
153+
.await
154+
.map(|row| Accounting::from(&row))
155+
}
156+
});
154157

155158
// Spenders
156-
for (spender, amount) in delta_balances.spenders {
157-
let row = client
158-
.query_one(
159-
&statement,
160-
&[
161-
&channel_id,
162-
&Side::Spender,
163-
&spender,
164-
&amount,
165-
&updated,
166-
&now,
167-
],
168-
)
169-
.await?;
159+
let spenders_futures = delta_balances
160+
.spenders
161+
.into_iter()
162+
.map(|(spender, amount)| {
163+
let statement = statement.clone();
164+
165+
async move {
166+
client
167+
.query_one(
168+
&statement,
169+
&[&channel_id, &Side::Spender, &spender, &amount],
170+
)
171+
.await
172+
.map(|row| Accounting::from(&row))
173+
}
174+
});
170175

171-
spenders.push(Accounting::from(&row))
172-
}
176+
let earners = join_all(earners_futures);
177+
let spenders = join_all(spenders_futures);
178+
179+
// collect all the Accounting updates into Vectors
180+
let (earners, spenders) = join(earners, spenders).await;
173181

174-
Ok((earners, spenders))
182+
// Return an error if any of the Accounting updates failed
183+
Ok((
184+
earners.into_iter().collect::<Result<_, _>>()?,
185+
spenders.into_iter().collect::<Result<_, _>>()?,
186+
))
175187
}
176188

177189
#[cfg(test)]
@@ -531,7 +543,7 @@ mod test {
531543
earners_acc
532544
.iter()
533545
.find(|a| a.address == earner)
534-
.unwrap()
546+
.expect("Should find Accounting")
535547
.clone(),
536548
false,
537549
);
@@ -540,7 +552,7 @@ mod test {
540552
earners_acc
541553
.iter()
542554
.find(|a| a.address == other_earner)
543-
.unwrap()
555+
.expect("Should find Accounting")
544556
.clone(),
545557
false,
546558
);
@@ -551,7 +563,7 @@ mod test {
551563
spenders_acc
552564
.iter()
553565
.find(|a| a.address == spender)
554-
.unwrap()
566+
.expect("Should find Accounting")
555567
.clone(),
556568
false,
557569
);
@@ -560,7 +572,7 @@ mod test {
560572
spenders_acc
561573
.iter()
562574
.find(|a| a.address == other_spender)
563-
.unwrap()
575+
.expect("Should find Accounting")
564576
.clone(),
565577
false,
566578
);

sentry/src/db/analytics.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,12 @@ pub async fn update_analytics(
124124
let client = pool.get().await?;
125125

126126
let query = "INSERT INTO analytics(campaign_id, time, ad_unit, ad_slot, ad_slot_type, advertiser, publisher, hostname, country, os_name, chain_id, event_type, payout_amount, payout_count)
127-
VALUES ($1, date_trunc('hour', cast($2 as timestamp with time zone)), $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
127+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
128128
ON CONFLICT ON CONSTRAINT analytics_pkey DO UPDATE
129-
SET payout_amount = analytics.payout_amount + $13, payout_count = analytics.payout_count + 1
129+
SET payout_amount = analytics.payout_amount + EXCLUDED.payout_amount, payout_count = analytics.payout_count + EXCLUDED.payout_count
130130
RETURNING campaign_id, time, ad_unit, ad_slot, ad_slot_type, advertiser, publisher, hostname, country, os_name, event_type, payout_amount, payout_count";
131131

132-
let stmt = client.prepare(query).await?;
132+
let stmt = client.prepare_cached(query).await?;
133133

134134
let row = client
135135
.query_one(

sentry/src/payout.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::cmp::{max, min};
1111

1212
pub type Result = std::result::Result<Option<(Address, UnifiedNum)>, Error>;
1313

14-
/// If None is returned this means that the targeting rules evaluation has set `show = false`
14+
/// If `None` is returned this means that the targeting rules evaluation has set `show = false`
1515
pub fn get_payout(
1616
logger: &Logger,
1717
campaign: &Campaign,

0 commit comments

Comments
 (0)