1
- use std:: collections:: HashSet ;
1
+ use std:: { collections:: HashSet , sync :: Arc } ;
2
2
3
- use adapter:: client:: Locked ;
3
+ use axum:: {
4
+ extract:: TypedHeader ,
5
+ headers:: UserAgent ,
6
+ http:: header:: { HeaderMap , HeaderName } ,
7
+ Extension , Json ,
8
+ } ;
4
9
use chrono:: Utc ;
5
10
use futures:: future:: try_join_all;
6
- use hyper:: { header:: USER_AGENT , Body , Request , Response } ;
7
- use hyper:: {
8
- header:: { HeaderName , CONTENT_TYPE } ,
9
- StatusCode ,
10
- } ;
11
11
use once_cell:: sync:: Lazy ;
12
+ use reqwest:: Url ;
13
+ use serde:: { Deserialize , Serialize } ;
14
+ use slog:: { debug, error, warn, Logger } ;
15
+ use woothee:: { parser:: Parser , woothee:: VALUE_UNKNOWN } ;
16
+
17
+ use adapter:: client:: Locked ;
12
18
use primitives:: {
13
19
sentry:: IMPRESSION ,
14
20
supermarket:: units_for_slot:: response,
15
21
supermarket:: units_for_slot:: response:: Response as UnitsForSlotResponse ,
16
22
targeting:: { eval_with_callback, get_pricing_bounds, input, input:: Input , Output } ,
17
23
AdSlot , AdUnit , Address , Campaign , Config , UnifiedNum , ValidatorId ,
18
24
} ;
19
- use reqwest:: Url ;
20
- use serde:: { Deserialize , Serialize } ;
21
- use slog:: { debug, error, warn, Logger } ;
22
- use woothee:: { parser:: Parser , woothee:: VALUE_UNKNOWN } ;
23
25
24
26
use crate :: {
25
27
db:: {
26
28
accounting:: { get_accounting, Side } ,
27
29
units_for_slot_get_campaigns, CampaignRemaining , DbPool ,
28
30
} ,
29
- Application , ResponseError ,
31
+ response:: ResponseError ,
32
+ Application ,
30
33
} ;
31
34
32
35
pub ( crate ) static CLOUDFLARE_IPCOUNTRY_HEADER : Lazy < HeaderName > =
@@ -43,36 +46,15 @@ pub struct RequestBody {
43
46
pub deposit_assets : Option < HashSet < Address > > ,
44
47
}
45
48
46
- pub ( crate ) fn not_found ( ) -> Response < Body > {
47
- Response :: builder ( )
48
- . status ( StatusCode :: NOT_FOUND )
49
- . body ( Body :: empty ( ) )
50
- . expect ( "Not Found response should be valid" )
51
- }
52
-
53
- pub ( crate ) fn service_unavailable ( ) -> Response < Body > {
54
- Response :: builder ( )
55
- . status ( StatusCode :: SERVICE_UNAVAILABLE )
56
- . body ( Body :: empty ( ) )
57
- . expect ( "Bad Request response should be valid" )
58
- }
59
-
60
49
pub async fn post_units_for_slot < C > (
61
- req : Request < Body > ,
62
- app : & Application < C > ,
63
- ) -> Result < Response < Body > , ResponseError >
50
+ Extension ( app) : Extension < Arc < Application < C > > > ,
51
+ Json ( request_body) : Json < RequestBody > ,
52
+ user_agent : Option < TypedHeader < UserAgent > > ,
53
+ headers : HeaderMap ,
54
+ ) -> Result < Json < UnitsForSlotResponse > , ResponseError >
64
55
where
65
56
C : Locked + ' static ,
66
57
{
67
- let logger = & app. logger ;
68
- let config = & app. config ;
69
-
70
- let ( request_parts, body) = req. into_parts ( ) ;
71
-
72
- let body_bytes = hyper:: body:: to_bytes ( body) . await ?;
73
-
74
- let request_body = serde_json:: from_slice :: < RequestBody > ( & body_bytes) ?;
75
-
76
58
let ad_slot = request_body. ad_slot . clone ( ) ;
77
59
78
60
// TODO: remove once we know how/where we will be fetching the rest of the information!
@@ -87,40 +69,45 @@ where
87
69
{
88
70
Ok ( units) => units,
89
71
Err ( error) => {
90
- error ! ( & logger, "Error fetching AdUnits for AdSlot" ; "AdSlot" => ?ad_slot, "error" => ?error) ;
72
+ error ! ( & app . logger, "Error fetching AdUnits for AdSlot" ; "AdSlot" => ?ad_slot, "error" => ?error) ;
91
73
92
- return Ok ( service_unavailable ( ) ) ;
74
+ return Err ( ResponseError :: BadRequest (
75
+ "Error fetching AdUnits for AdSlot. Please try again later." . into ( ) ,
76
+ ) ) ;
93
77
}
94
78
} ;
95
79
96
80
let fallback_unit: Option < AdUnit > = match & ad_slot. fallback_unit {
97
81
Some ( unit_ipfs) => {
98
82
let ad_unit_response = match app. platform_api . fetch_unit ( * unit_ipfs) . await {
99
83
Ok ( Some ( response) ) => {
100
- debug ! ( & logger, "Fetched AdUnit {:?}" , unit_ipfs; "AdUnit" => ?unit_ipfs) ;
84
+ debug ! ( & app . logger, "Fetched AdUnit {:?}" , unit_ipfs; "AdUnit" => ?unit_ipfs) ;
101
85
response
102
86
}
103
87
Ok ( None ) => {
104
88
warn ! (
105
- & logger,
89
+ & app . logger,
106
90
"AdSlot fallback AdUnit {} not found in Platform" ,
107
91
unit_ipfs;
108
92
"AdUnit" => ?unit_ipfs,
109
93
"AdSlot" => ?ad_slot,
110
94
) ;
111
95
112
- return Ok ( not_found ( ) ) ;
96
+ return Err ( ResponseError :: NotFound ) ;
113
97
}
114
98
Err ( error) => {
115
- error ! ( & logger,
99
+ error ! ( & app . logger,
116
100
"Error when fetching AdSlot fallback AdUnit ({}) from Platform" ,
117
101
unit_ipfs;
118
102
"AdSlot" => ?ad_slot,
119
103
"Fallback AdUnit" => ?unit_ipfs,
120
104
"error" => ?error
121
105
) ;
122
106
123
- return Ok ( service_unavailable ( ) ) ;
107
+ return Err ( ResponseError :: BadRequest (
108
+ "Error when fetching AdSlot fallback AdUnit. Please try again later."
109
+ . into ( ) ,
110
+ ) ) ;
124
111
}
125
112
} ;
126
113
@@ -129,44 +116,35 @@ where
129
116
None => None ,
130
117
} ;
131
118
132
- debug ! ( & logger, "Fetched {} AdUnits for AdSlot" , units. len( ) ; "AdSlot" => ?ad_slot) ;
119
+ debug ! ( & app . logger, "Fetched {} AdUnits for AdSlot" , units. len( ) ; "AdSlot" => ?ad_slot) ;
133
120
134
121
// For each adUnits apply input
135
122
let ua_parser = Parser :: new ( ) ;
136
- let user_agent = request_parts
137
- . headers
138
- . get ( USER_AGENT )
139
- . and_then ( |h| h. to_str ( ) . map ( ToString :: to_string) . ok ( ) )
123
+ let user_agent = user_agent
124
+ . map ( |h| h. as_str ( ) . to_string ( ) )
140
125
. unwrap_or_default ( ) ;
141
126
let parsed = ua_parser. parse ( & user_agent) ;
142
127
// WARNING! This will return only the OS type, e.g. `Linux` and not the actual distribution name e.g. `Ubuntu`
143
128
// By contrast `ua-parser-js` will return `Ubuntu` (distribution) and not the OS type `Linux`.
144
129
// `UAParser(...).os.name` (`ua-parser-js: 0.7.22`)
145
- let user_agent_os = parsed
146
- . as_ref ( )
147
- . map ( |p| {
148
- if p. os != VALUE_UNKNOWN {
149
- Some ( p. os . to_string ( ) )
150
- } else {
151
- None
152
- }
153
- } )
154
- . flatten ( ) ;
130
+ let user_agent_os = parsed. as_ref ( ) . and_then ( |p| {
131
+ if p. os != VALUE_UNKNOWN {
132
+ Some ( p. os . to_string ( ) )
133
+ } else {
134
+ None
135
+ }
136
+ } ) ;
155
137
156
138
// Corresponds to `UAParser(...).browser.name` (`ua-parser-js: 0.7.22`)
157
- let user_agent_browser_family = parsed
158
- . as_ref ( )
159
- . map ( |p| {
160
- if p. name != VALUE_UNKNOWN {
161
- Some ( p. name . to_string ( ) )
162
- } else {
163
- None
164
- }
165
- } )
166
- . flatten ( ) ;
139
+ let user_agent_browser_family = parsed. as_ref ( ) . and_then ( |p| {
140
+ if p. name != VALUE_UNKNOWN {
141
+ Some ( p. name . to_string ( ) )
142
+ } else {
143
+ None
144
+ }
145
+ } ) ;
167
146
168
- let country = request_parts
169
- . headers
147
+ let country = headers
170
148
. get ( CLOUDFLARE_IPCOUNTRY_HEADER . clone ( ) )
171
149
. and_then ( |h| h. to_str ( ) . map ( ToString :: to_string) . ok ( ) ) ;
172
150
@@ -180,15 +158,15 @@ where
180
158
let campaigns_limited_by_earner = get_campaigns (
181
159
app. pool . clone ( ) ,
182
160
app. campaign_remaining . clone ( ) ,
183
- config,
161
+ & app . config ,
184
162
& request_body. deposit_assets ,
185
163
publisher_id,
186
164
)
187
165
. await
188
166
// TODO: Fix mapping this error and Log the error!
189
167
. map_err ( |err| ResponseError :: BadRequest ( err. to_string ( ) ) ) ?;
190
168
191
- debug ! ( & logger, "Fetched Cache campaigns limited by earner (publisher)" ; "campaigns" => campaigns_limited_by_earner. len( ) , "publisher_id" => %publisher_id) ;
169
+ debug ! ( & app . logger, "Fetched Cache campaigns limited by earner (publisher)" ; "campaigns" => campaigns_limited_by_earner. len( ) , "publisher_id" => %publisher_id) ;
192
170
193
171
// We return those in the result (which means AdView would have those) but we don't actually use them
194
172
// we do that in order to have the same variables as the validator, so that the `price` is the same
@@ -205,7 +183,7 @@ where
205
183
ad_slot_type : ad_slot. ad_type . clone ( ) ,
206
184
publisher_id : publisher_id. to_address ( ) ,
207
185
country,
208
- event_type : IMPRESSION . into ( ) ,
186
+ event_type : IMPRESSION ,
209
187
seconds_since_epoch : Utc :: now ( ) ,
210
188
user_agent_os,
211
189
user_agent_browser_family : user_agent_browser_family. clone ( ) ,
@@ -217,8 +195,8 @@ where
217
195
} ;
218
196
219
197
let campaigns = apply_targeting (
220
- config,
221
- logger,
198
+ & app . config ,
199
+ & app . logger ,
222
200
campaigns_limited_by_earner,
223
201
targeting_input_base. clone ( ) ,
224
202
ad_slot,
@@ -234,11 +212,7 @@ where
234
212
fallback_unit : fallback_unit. map ( |ad_unit| response:: AdUnit :: from ( & ad_unit) ) ,
235
213
} ;
236
214
237
- Ok ( Response :: builder ( )
238
- . status ( StatusCode :: OK )
239
- . header ( CONTENT_TYPE , "application/json" )
240
- . body ( Body :: from ( serde_json:: to_string ( & response) ?) )
241
- . expect ( "Should create response" ) )
215
+ Ok ( Json ( response) )
242
216
}
243
217
244
218
// TODO: Use error instead of std::error::Error
@@ -267,7 +241,7 @@ async fn get_campaigns(
267
241
268
242
// 2. Check those Campaigns if `Campaign remaining > 0` (in redis)
269
243
let campaigns_remaining = campaign_remaining
270
- . get_multiple_with_ids ( & active_campaign_ids)
244
+ . get_multiple_with_ids ( active_campaign_ids)
271
245
. await ?;
272
246
273
247
let campaigns_with_remaining = campaigns_remaining
@@ -301,16 +275,15 @@ async fn get_campaigns(
301
275
} ) )
302
276
. await ?
303
277
. into_iter ( )
304
- . filter_map ( |accounting| accounting )
278
+ . flatten ( )
305
279
. collect :: < Vec < _ > > ( ) ;
306
280
307
281
// 3. Filter `Campaign`s, that include the `publisher_id` in the Channel balances.
308
282
let ( mut campaigns_by_earner, rest_of_campaigns) : ( Vec < Campaign > , Vec < Campaign > ) =
309
283
campaigns_with_remaining. into_iter ( ) . partition ( |campaign| {
310
284
publisher_accountings
311
285
. iter ( )
312
- . find ( |accounting| accounting. channel_id == campaign. channel . id ( ) )
313
- . is_some ( )
286
+ . any ( |accounting| accounting. channel_id == campaign. channel . id ( ) )
314
287
} ) ;
315
288
316
289
let campaigns = if campaigns_by_earner. len ( )
@@ -363,7 +336,7 @@ async fn apply_targeting(
363
336
show : true ,
364
337
boost : 1.0 ,
365
338
// only "IMPRESSION" event can be used for this `Output`
366
- price : vec ! [ ( IMPRESSION . into ( ) , pricing_bounds. min) ]
339
+ price : [ ( IMPRESSION , pricing_bounds. min ) ]
367
340
. into_iter ( )
368
341
. collect ( ) ,
369
342
} ;
0 commit comments