@@ -13,7 +13,7 @@ use std::{
13
13
convert:: { TryFrom , TryInto } ,
14
14
env, fmt,
15
15
future:: Future ,
16
- sync :: Arc ,
16
+ panic ,
17
17
} ;
18
18
use tokio:: io:: { AsyncRead , AsyncWrite } ;
19
19
use tokio_stream:: { Stream , StreamExt } ;
@@ -96,7 +96,7 @@ pub struct HandlerFn<F> {
96
96
impl < F , A , B , Error , Fut > Handler < A , B > for HandlerFn < F >
97
97
where
98
98
F : Fn ( A , Context ) -> Fut ,
99
- Fut : Future < Output = Result < B , Error > > + Send ,
99
+ Fut : Future < Output = Result < B , Error > > ,
100
100
Error : Into < Box < dyn std:: error:: Error + Send + Sync + ' static > > + fmt:: Display ,
101
101
{
102
102
type Error = Error ;
@@ -139,14 +139,13 @@ where
139
139
config : & Config ,
140
140
) -> Result < ( ) , Error >
141
141
where
142
- F : Handler < A , B > + Send + Sync + ' static ,
143
- <F as Handler < A , B > >:: Fut : Future < Output = Result < B , <F as Handler < A , B > >:: Error > > + Send + ' static ,
144
- <F as Handler < A , B > >:: Error : fmt:: Display + Send + Sync + ' static ,
145
- A : for < ' de > Deserialize < ' de > + Send + Sync + ' static ,
146
- B : Serialize + Send + Sync + ' static ,
142
+ F : Handler < A , B > ,
143
+ <F as Handler < A , B > >:: Fut : Future < Output = Result < B , <F as Handler < A , B > >:: Error > > ,
144
+ <F as Handler < A , B > >:: Error : fmt:: Display ,
145
+ A : for < ' de > Deserialize < ' de > ,
146
+ B : Serialize ,
147
147
{
148
148
let client = & self . client ;
149
- let handler = Arc :: new ( handler) ;
150
149
tokio:: pin!( incoming) ;
151
150
while let Some ( event) = incoming. next ( ) . await {
152
151
trace ! ( "New event arrived (run loop)" ) ;
@@ -159,12 +158,10 @@ where
159
158
trace ! ( "{}" , std:: str :: from_utf8( & body) ?) ; // this may be very verbose
160
159
let body = serde_json:: from_slice ( & body) ?;
161
160
162
- let handler = Arc :: clone ( & handler) ;
163
161
let request_id = & ctx. request_id . clone ( ) ;
164
- #[ allow( clippy:: async_yields_async) ]
165
- let task = tokio:: spawn ( async move { handler. call ( body, ctx) } ) ;
162
+ let task = panic:: catch_unwind ( panic:: AssertUnwindSafe ( || handler. call ( body, ctx) ) ) ;
166
163
167
- let req = match task. await {
164
+ let req = match task {
168
165
Ok ( response) => match response. await {
169
166
Ok ( response) => {
170
167
trace ! ( "Ok response from handler (run loop)" ) ;
@@ -186,18 +183,21 @@ where
186
183
. into_req ( )
187
184
}
188
185
} ,
189
- Err ( err) if err . is_panic ( ) => {
186
+ Err ( err) => {
190
187
error ! ( "{:?}" , err) ; // inconsistent with other log record formats - to be reviewed
191
188
EventErrorRequest {
192
189
request_id,
193
190
diagnostic : Diagnostic {
194
191
error_type : type_name_of_val ( & err) . to_owned ( ) ,
195
- error_message : format ! ( "Lambda panicked: {}" , err) ,
192
+ error_message : if let Some ( msg) = err. downcast_ref :: < & str > ( ) {
193
+ format ! ( "Lambda panicked: {}" , msg)
194
+ } else {
195
+ "Lambda panicked" . to_string ( )
196
+ } ,
196
197
} ,
197
198
}
198
199
. into_req ( )
199
200
}
200
- Err ( _) => unreachable ! ( "tokio::task should not be canceled" ) ,
201
201
} ;
202
202
let req = req?;
203
203
client. call ( req) . await . expect ( "Unable to send response to Runtime APIs" ) ;
@@ -296,11 +296,11 @@ where
296
296
/// ```
297
297
pub async fn run < A , B , F > ( handler : F ) -> Result < ( ) , Error >
298
298
where
299
- F : Handler < A , B > + Send + Sync + ' static ,
300
- <F as Handler < A , B > >:: Fut : Future < Output = Result < B , <F as Handler < A , B > >:: Error > > + Send + ' static ,
301
- <F as Handler < A , B > >:: Error : fmt:: Display + Send + Sync + ' static ,
302
- A : for < ' de > Deserialize < ' de > + Send + Sync + ' static ,
303
- B : Serialize + Send + Sync + ' static ,
299
+ F : Handler < A , B > ,
300
+ <F as Handler < A , B > >:: Fut : Future < Output = Result < B , <F as Handler < A , B > >:: Error > > ,
301
+ <F as Handler < A , B > >:: Error : fmt:: Display ,
302
+ A : for < ' de > Deserialize < ' de > ,
303
+ B : Serialize ,
304
304
{
305
305
trace ! ( "Loading config from env" ) ;
306
306
let config = Config :: from_env ( ) ?;
0 commit comments