@@ -9,22 +9,31 @@ use tokio_stream::StreamExt;
9
9
use tower:: { service_fn, MakeService , Service , ServiceExt } ;
10
10
use tracing:: { error, trace} ;
11
11
12
- use crate :: { logs:: * , requests, Error , ExtensionError , LambdaEvent , NextEvent } ;
12
+ use crate :: {
13
+ logs:: * ,
14
+ requests:: { self , Api } ,
15
+ telemetry_wrapper, Error , ExtensionError , LambdaEvent , LambdaTelemetry , NextEvent ,
16
+ } ;
13
17
14
18
const DEFAULT_LOG_PORT_NUMBER : u16 = 9002 ;
19
+ const DEFAULT_TELEMETRY_PORT_NUMBER : u16 = 9003 ;
15
20
16
- /// An Extension that runs event and log processors
17
- pub struct Extension < ' a , E , L > {
21
+ /// An Extension that runs event, log and telemetry processors
22
+ pub struct Extension < ' a , E , L , T > {
18
23
extension_name : Option < & ' a str > ,
19
24
events : Option < & ' a [ & ' a str ] > ,
20
25
events_processor : E ,
21
26
log_types : Option < & ' a [ & ' a str ] > ,
22
27
logs_processor : Option < L > ,
23
28
log_buffering : Option < LogBuffering > ,
24
29
log_port_number : u16 ,
30
+ telemetry_types : Option < & ' a [ & ' a str ] > ,
31
+ telemetry_processor : Option < T > ,
32
+ telemetry_buffering : Option < LogBuffering > ,
33
+ telemetry_port_number : u16 ,
25
34
}
26
35
27
- impl < ' a > Extension < ' a , Identity < LambdaEvent > , MakeIdentity < Vec < LambdaLog > > > {
36
+ impl < ' a > Extension < ' a , Identity < LambdaEvent > , MakeIdentity < Vec < LambdaLog > > , MakeIdentity < Vec < LambdaTelemetry > > > {
28
37
/// Create a new base [`Extension`] with a no-op events processor
29
38
pub fn new ( ) -> Self {
30
39
Extension {
@@ -35,17 +44,23 @@ impl<'a> Extension<'a, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>> {
35
44
log_buffering : None ,
36
45
logs_processor : None ,
37
46
log_port_number : DEFAULT_LOG_PORT_NUMBER ,
47
+ telemetry_types : None ,
48
+ telemetry_buffering : None ,
49
+ telemetry_processor : None ,
50
+ telemetry_port_number : DEFAULT_TELEMETRY_PORT_NUMBER ,
38
51
}
39
52
}
40
53
}
41
54
42
- impl < ' a > Default for Extension < ' a , Identity < LambdaEvent > , MakeIdentity < Vec < LambdaLog > > > {
55
+ impl < ' a > Default
56
+ for Extension < ' a , Identity < LambdaEvent > , MakeIdentity < Vec < LambdaLog > > , MakeIdentity < Vec < LambdaTelemetry > > >
57
+ {
43
58
fn default ( ) -> Self {
44
59
Self :: new ( )
45
60
}
46
61
}
47
62
48
- impl < ' a , E , L > Extension < ' a , E , L >
63
+ impl < ' a , E , L , T > Extension < ' a , E , L , T >
49
64
where
50
65
E : Service < LambdaEvent > ,
51
66
E :: Future : Future < Output = Result < ( ) , E :: Error > > ,
58
73
L :: Error : Into < Box < dyn std:: error:: Error + Send + Sync > > + fmt:: Debug ,
59
74
L :: MakeError : Into < Box < dyn std:: error:: Error + Send + Sync > > + fmt:: Debug ,
60
75
L :: Future : Send ,
76
+
77
+ // Fixme: 'static bound might be too restrictive
78
+ T : MakeService < ( ) , Vec < LambdaTelemetry > , Response = ( ) > + Send + Sync + ' static ,
79
+ T :: Service : Service < Vec < LambdaTelemetry > , Response = ( ) > + Send + Sync ,
80
+ <T :: Service as Service < Vec < LambdaTelemetry > > >:: Future : Send + ' a ,
81
+ T :: Error : Into < Box < dyn std:: error:: Error + Send + Sync > > + fmt:: Debug ,
82
+ T :: MakeError : Into < Box < dyn std:: error:: Error + Send + Sync > > + fmt:: Debug ,
83
+ T :: Future : Send ,
61
84
{
62
85
/// Create a new [`Extension`] with a given extension name
63
86
pub fn with_extension_name ( self , extension_name : & ' a str ) -> Self {
77
100
}
78
101
79
102
/// Create a new [`Extension`] with a service that receives Lambda events.
80
- pub fn with_events_processor < N > ( self , ep : N ) -> Extension < ' a , N , L >
103
+ pub fn with_events_processor < N > ( self , ep : N ) -> Extension < ' a , N , L , T >
81
104
where
82
105
N : Service < LambdaEvent > ,
83
106
N :: Future : Future < Output = Result < ( ) , N :: Error > > ,
@@ -91,11 +114,15 @@ where
91
114
log_buffering : self . log_buffering ,
92
115
logs_processor : self . logs_processor ,
93
116
log_port_number : self . log_port_number ,
117
+ telemetry_types : self . telemetry_types ,
118
+ telemetry_buffering : self . telemetry_buffering ,
119
+ telemetry_processor : self . telemetry_processor ,
120
+ telemetry_port_number : self . telemetry_port_number ,
94
121
}
95
122
}
96
123
97
124
/// Create a new [`Extension`] with a service that receives Lambda logs.
98
- pub fn with_logs_processor < N , NS > ( self , lp : N ) -> Extension < ' a , E , N >
125
+ pub fn with_logs_processor < N , NS > ( self , lp : N ) -> Extension < ' a , E , N , T >
99
126
where
100
127
N : Service < ( ) > ,
101
128
N :: Future : Future < Output = Result < NS , N :: Error > > ,
@@ -109,6 +136,10 @@ where
109
136
log_types : self . log_types ,
110
137
log_buffering : self . log_buffering ,
111
138
log_port_number : self . log_port_number ,
139
+ telemetry_types : self . telemetry_types ,
140
+ telemetry_buffering : self . telemetry_buffering ,
141
+ telemetry_processor : self . telemetry_processor ,
142
+ telemetry_port_number : self . telemetry_port_number ,
112
143
}
113
144
}
114
145
@@ -137,6 +168,53 @@ where
137
168
}
138
169
}
139
170
171
+ /// Create a new [`Extension`] with a service that receives Lambda telemetry data.
172
+ pub fn with_telemetry_processor < N , NS > ( self , lp : N ) -> Extension < ' a , E , L , N >
173
+ where
174
+ N : Service < ( ) > ,
175
+ N :: Future : Future < Output = Result < NS , N :: Error > > ,
176
+ N :: Error : Into < Box < dyn std:: error:: Error + Send + Sync > > + fmt:: Display ,
177
+ {
178
+ Extension {
179
+ telemetry_processor : Some ( lp) ,
180
+ events_processor : self . events_processor ,
181
+ extension_name : self . extension_name ,
182
+ events : self . events ,
183
+ log_types : self . log_types ,
184
+ log_buffering : self . log_buffering ,
185
+ logs_processor : self . logs_processor ,
186
+ log_port_number : self . log_port_number ,
187
+ telemetry_types : self . telemetry_types ,
188
+ telemetry_buffering : self . telemetry_buffering ,
189
+ telemetry_port_number : self . telemetry_port_number ,
190
+ }
191
+ }
192
+
193
+ /// Create a new [`Extension`] with a list of telemetry types to subscribe.
194
+ /// The only accepted telemetry types are `function`, `platform`, and `extension`.
195
+ pub fn with_telemetry_types ( self , telemetry_types : & ' a [ & ' a str ] ) -> Self {
196
+ Extension {
197
+ telemetry_types : Some ( telemetry_types) ,
198
+ ..self
199
+ }
200
+ }
201
+
202
+ /// Create a new [`Extension`] with specific configuration to buffer telemetry.
203
+ pub fn with_telemetry_buffering ( self , lb : LogBuffering ) -> Self {
204
+ Extension {
205
+ telemetry_buffering : Some ( lb) ,
206
+ ..self
207
+ }
208
+ }
209
+
210
+ /// Create a new [`Extension`] with a different port number to listen to telemetry.
211
+ pub fn with_telemetry_port_number ( self , port_number : u16 ) -> Self {
212
+ Extension {
213
+ telemetry_port_number : port_number,
214
+ ..self
215
+ }
216
+ }
217
+
140
218
/// Execute the given extension
141
219
pub async fn run ( self ) -> Result < ( ) , Error > {
142
220
let client = & Client :: builder ( ) . build ( ) ?;
@@ -166,7 +244,8 @@ where
166
244
trace ! ( "Log processor started" ) ;
167
245
168
246
// Call Logs API to start receiving events
169
- let req = requests:: subscribe_logs_request (
247
+ let req = requests:: subscribe_request (
248
+ Api :: LogsApi ,
170
249
extension_id,
171
250
self . log_types ,
172
251
self . log_buffering ,
@@ -179,6 +258,41 @@ where
179
258
trace ! ( "Registered extension with Logs API" ) ;
180
259
}
181
260
261
+ if let Some ( mut telemetry_processor) = self . telemetry_processor {
262
+ trace ! ( "Telemetry processor found" ) ;
263
+ // Spawn task to run processor
264
+ let addr = SocketAddr :: from ( ( [ 0 , 0 , 0 , 0 ] , self . telemetry_port_number ) ) ;
265
+ let make_service = service_fn ( move |_socket : & AddrStream | {
266
+ trace ! ( "Creating new telemetry processor Service" ) ;
267
+ let service = telemetry_processor. make_service ( ( ) ) ;
268
+ async move {
269
+ let service = Arc :: new ( Mutex :: new ( service. await ?) ) ;
270
+ Ok :: < _ , T :: MakeError > ( service_fn ( move |req| telemetry_wrapper ( service. clone ( ) , req) ) )
271
+ }
272
+ } ) ;
273
+ let server = Server :: bind ( & addr) . serve ( make_service) ;
274
+ tokio:: spawn ( async move {
275
+ if let Err ( e) = server. await {
276
+ error ! ( "Error while running telemetry processor: {}" , e) ;
277
+ }
278
+ } ) ;
279
+ trace ! ( "Telemetry processor started" ) ;
280
+
281
+ // Call Telemetry API to start receiving events
282
+ let req = requests:: subscribe_request (
283
+ Api :: TelemetryApi ,
284
+ extension_id,
285
+ self . telemetry_types ,
286
+ self . telemetry_buffering ,
287
+ self . telemetry_port_number ,
288
+ ) ?;
289
+ let res = client. call ( req) . await ?;
290
+ if res. status ( ) != http:: StatusCode :: OK {
291
+ return Err ( ExtensionError :: boxed ( "unable to initialize the telemetry api" ) ) ;
292
+ }
293
+ trace ! ( "Registered extension with Telemetry API" ) ;
294
+ }
295
+
182
296
let incoming = async_stream:: stream! {
183
297
loop {
184
298
trace!( "Waiting for next event (incoming loop)" ) ;
0 commit comments