@@ -42,7 +42,9 @@ pub struct ClientBuilder {
42
42
impl ClientBuilder {
43
43
/// Set a HTTP header on the SSE request.
44
44
pub fn header ( mut self , key : & ' static str , value : & str ) -> Result < ClientBuilder > {
45
- let value = value. parse ( ) . map_err ( |e| Error :: HttpRequest ( Box :: new ( e) ) ) ?;
45
+ let value = value
46
+ . parse ( )
47
+ . map_err ( |e| Error :: InvalidParameter ( Box :: new ( e) ) ) ?;
46
48
self . headers . insert ( key, value) ;
47
49
Ok ( self )
48
50
}
@@ -113,7 +115,9 @@ impl Client<()> {
113
115
/// [`ClientBuilder`]: struct.ClientBuilder.html
114
116
/// [`.stream()`]: #method.stream
115
117
pub fn for_url ( url : & str ) -> Result < ClientBuilder > {
116
- let url = url. parse ( ) . map_err ( |e| Error :: HttpRequest ( Box :: new ( e) ) ) ?;
118
+ let url = url
119
+ . parse ( )
120
+ . map_err ( |e| Error :: InvalidParameter ( Box :: new ( e) ) ) ?;
117
121
Ok ( ClientBuilder {
118
122
url,
119
123
headers : HeaderMap :: new ( ) ,
@@ -205,7 +209,7 @@ impl<C> ReconnectingRequest<C> {
205
209
* request. headers_mut ( ) . unwrap ( ) = self . props . headers . clone ( ) ;
206
210
let request = request
207
211
. body ( Body :: empty ( ) )
208
- . map_err ( |e| Error :: HttpRequest ( Box :: new ( e) ) ) ?;
212
+ . map_err ( |e| Error :: InvalidParameter ( Box :: new ( e) ) ) ?;
209
213
Ok ( self . http . request ( request) )
210
214
}
211
215
@@ -257,40 +261,50 @@ where
257
261
loop {
258
262
trace ! ( "ReconnectingRequest::poll loop({:?})" , & self . state) ;
259
263
260
- let state = self . as_mut ( ) . project ( ) . state . project ( ) ;
261
- let new_state = match state {
264
+ let this = self . as_mut ( ) . project ( ) ;
265
+ let state = this. state . project ( ) ;
266
+
267
+ match state {
262
268
// New immediately transitions to Connecting, and exists only
263
269
// to ensure that we only connect when polled.
264
270
StateProj :: New => {
265
271
let resp = match self . send_request ( ) {
266
272
Err ( e) => return Poll :: Ready ( Some ( Err ( e) ) ) ,
267
273
Ok ( r) => r,
268
274
} ;
269
- State :: Connecting {
270
- resp,
271
- retry : self . props . reconnect_opts . retry_initial ,
272
- }
275
+ let retry = self . props . reconnect_opts . retry_initial ;
276
+ self . as_mut ( )
277
+ . project ( )
278
+ . state
279
+ . set ( State :: Connecting { resp, retry } )
273
280
}
274
281
StateProj :: Connecting { retry, resp } => match ready ! ( resp. poll( cx) ) {
275
282
Ok ( resp) => {
276
283
debug ! ( "HTTP response: {:#?}" , resp) ;
277
284
278
285
if !resp. status ( ) . is_success ( ) {
279
- let e = StatusError {
280
- status : resp. status ( ) ,
281
- } ;
282
- return Poll :: Ready ( Some ( Err ( Error :: HttpRequest ( Box :: new ( e) ) ) ) ) ;
286
+ self . as_mut ( ) . project ( ) . state . set ( State :: New ) ;
287
+ return Poll :: Ready ( Some ( Err ( Error :: HttpRequest ( resp. status ( ) ) ) ) ) ;
283
288
}
284
289
285
290
self . as_mut ( ) . reset_backoff ( ) ;
286
- State :: Connected ( resp. into_body ( ) )
291
+ self . as_mut ( )
292
+ . project ( )
293
+ . state
294
+ . set ( State :: Connected ( resp. into_body ( ) ) )
287
295
}
288
296
Err ( e) => {
289
297
warn ! ( "request returned an error: {}" , e) ;
290
298
if !* retry {
299
+ self . as_mut ( ) . project ( ) . state . set ( State :: New ) ;
291
300
return Poll :: Ready ( Some ( Err ( Error :: HttpStream ( Box :: new ( e) ) ) ) ) ;
292
301
}
293
- State :: WaitingToReconnect ( delay ( self . as_mut ( ) . backoff ( ) , "retrying" ) )
302
+
303
+ let duration = self . as_mut ( ) . backoff ( ) ;
304
+ self . as_mut ( )
305
+ . project ( )
306
+ . state
307
+ . set ( State :: WaitingToReconnect ( delay ( duration, "retrying" ) ) )
294
308
}
295
309
} ,
296
310
StateProj :: Connected ( body) => match ready ! ( body. poll_data( cx) ) {
@@ -300,10 +314,11 @@ where
300
314
res => {
301
315
// reconnect
302
316
if self . props . reconnect_opts . reconnect {
303
- State :: WaitingToReconnect ( delay (
304
- self . as_mut ( ) . backoff ( ) ,
305
- "reconnecting" ,
306
- ) )
317
+ let duration = self . as_mut ( ) . backoff ( ) ;
318
+ self . as_mut ( )
319
+ . project ( )
320
+ . state
321
+ . set ( State :: WaitingToReconnect ( delay ( duration, "reconnecting" ) ) )
307
322
} else {
308
323
return Poll :: Ready (
309
324
res. map ( |r| r. map_err ( |e| Error :: HttpStream ( Box :: new ( e) ) ) ) ,
@@ -315,10 +330,12 @@ where
315
330
ready ! ( delay. poll( cx) ) ;
316
331
info ! ( "Reconnecting" ) ;
317
332
let resp = self . send_request ( ) ?;
318
- State :: Connecting { retry : true , resp }
333
+ self . as_mut ( )
334
+ . project ( )
335
+ . state
336
+ . set ( State :: Connecting { retry : true , resp } )
319
337
}
320
338
} ;
321
- self . as_mut ( ) . project ( ) . state . set ( new_state) ;
322
339
}
323
340
}
324
341
}
0 commit comments