Skip to content

Commit ea6f407

Browse files
authored
fix(http/retry): PeekTrailersBody<B> retains first frame (#3947)
see linkerd/linkerd2#14050. this change fixes a logical bug with `linkerd_http_retry::peek_trailers::PeekTrailersBody::<B>::read_body(..)`. `read_body(..)` constructs a `PeekTrailersBody<B>`, by polling the inner body to see whether or not it can reach the end of the stream by only yielding to the asynchronous runtime once. in #3559, we restructured this middleware's internal modeling to reflect the `Frame<T>`-oriented signatures of the `http_body::Body` trait's 1.0 interface. unfortunately, this included a bug which could cause the first frame in a stream to be discarded if the second `Body::poll_frame()` call (_invoked via `now_or_never()`_) returns `Pending`. this could cause non-deterministic errors for users when sending traffic to HTTPRoutes and GRPCRoutes with retry annotations applied. this change rectifies this problem, ensuring that the first frame is not discarded when attempting to peek a body's trailers. to confirm that this works as expected, additional test coverage is introduced that confirms that the data and trailers of the inner body are passed through faithfully. --- * feat(http/retry): additional `PeekTrailersBody<B>` test coverage this commit introduces additional test coverage to `linker_http_retry::peek_trailers::PeekTrailersBody<B>`. this body middleware is used to facilitate transparent http retries, and allows callers to possibly inspect the trailers for a response, by polling an `http_body::Body`. this commit introduces additional unit test coverage that confirms that the data and trailers of the inner body are passed through faithfully. Signed-off-by: katelyn martin <kate@buoyant.io> * feat(http/retry): another `PeekTrailersBody<B>` test case this commit introduces some additional coverage for bodies that return `Pending` when polled a second time. Signed-off-by: katelyn martin <kate@buoyant.io> * fix(http/retry): `PeekTrailersBody<B>` retains first frame this commit fixes a logical bug with `linkerd_http_retry::peek_trailers::PeekTrailersBody::<B>::read_body(..)`. `read_body(..)` constructs a `PeekTrailersBody<B>`, by polling the inner body to see whether or not it can reach the end of the stream by only yielding to the asynchronous runtime once. in #3559, we restructured this middleware's internal modeling to reflect the `Frame<T>`-oriented signatures of the `http_body::Body` trait's 1.0 interface. unfortunately, this included a bug which could cause the first frame in a stream to be discarded if the second `Body::poll_frame()` call (_invoked via `now_or_never()`_) returns `Pending`. this could cause non-deterministic errors for users when sending traffic to HTTPRoutes and GRPCRoutes with retry annotations applied. this commit rectifies this problem, ensuring that the first frame is not discarded when attempting to peek a body's trailers. Signed-off-by: katelyn martin <kate@buoyant.io> --------- Signed-off-by: katelyn martin <kate@buoyant.io>
1 parent c36bef6 commit ea6f407

File tree

1 file changed

+56
-2
lines changed

1 file changed

+56
-2
lines changed

linkerd/http/retry/src/peek_trailers.rs

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ impl<B: Body> PeekTrailersBody<B> {
182182
},
183183
// The body yielded an unknown kind of frame.
184184
Some(Ok(None)) => Inner::Buffered {
185-
first: None,
185+
first: Some(Ok(first)),
186186
second: None,
187187
inner: body,
188188
},
@@ -192,7 +192,7 @@ impl<B: Body> PeekTrailersBody<B> {
192192
// that a second DATA frame is on the way, and we are no longer willing to
193193
// await additional frames. There are no trailers to peek.
194194
Inner::Buffered {
195-
first: None,
195+
first: Some(Ok(first)),
196196
second: None,
197197
inner: body,
198198
}
@@ -338,6 +338,7 @@ mod tests {
338338
use bytes::Bytes;
339339
use http::{HeaderMap, HeaderValue};
340340
use http_body::Body;
341+
use http_body_util::BodyExt;
341342
use linkerd_error::Error;
342343
use linkerd_mock_http_body::MockBody;
343344
use std::{ops::Not, task::Poll};
@@ -354,13 +355,28 @@ mod tests {
354355
Some(Ok(trls))
355356
}
356357

358+
async fn collect<B>(body: B) -> (Bytes, Option<HeaderMap>)
359+
where
360+
B: Body,
361+
B::Error: std::fmt::Debug,
362+
{
363+
let coll = body.collect().await.expect("can collect");
364+
let trls = coll.trailers().cloned();
365+
let data = coll.to_bytes();
366+
(data, trls)
367+
}
368+
357369
#[tokio::test]
358370
async fn cannot_peek_empty() {
359371
let (_guard, _handle) = linkerd_tracing::test::trace_init();
360372
let empty = MockBody::default();
361373
let peek = PeekTrailersBody::read_body(empty).await;
362374
assert!(peek.peek_trailers().is_none());
363375
assert!(peek.is_end_stream());
376+
377+
let (data, trailers) = collect(peek).await;
378+
assert_eq!(data, "");
379+
assert!(trailers.is_none());
364380
}
365381

366382
#[tokio::test]
@@ -370,6 +386,10 @@ mod tests {
370386
let peek = PeekTrailersBody::read_body(only_trailers).await;
371387
assert!(peek.peek_trailers().is_some());
372388
assert!(peek.is_end_stream().not());
389+
390+
let (data, trailers) = collect(peek).await;
391+
assert_eq!(data, "");
392+
assert_eq!(trailers.unwrap().get("trailer").unwrap(), "shiny");
373393
}
374394

375395
#[tokio::test]
@@ -381,6 +401,10 @@ mod tests {
381401
let peek = PeekTrailersBody::read_body(body).await;
382402
assert!(peek.peek_trailers().is_some());
383403
assert!(peek.is_end_stream().not());
404+
405+
let (data, trailers) = collect(peek).await;
406+
assert_eq!(data, "hello");
407+
assert_eq!(trailers.unwrap().get("trailer").unwrap(), "shiny");
384408
}
385409

386410
#[tokio::test]
@@ -393,6 +417,10 @@ mod tests {
393417
let peek = PeekTrailersBody::read_body(body).await;
394418
assert!(peek.peek_trailers().is_none());
395419
assert!(peek.is_end_stream().not());
420+
421+
let (data, trailers) = collect(peek).await;
422+
assert_eq!(data, "hello");
423+
assert_eq!(trailers.unwrap().get("trailer").unwrap(), "shiny");
396424
}
397425

398426
#[tokio::test]
@@ -405,6 +433,10 @@ mod tests {
405433
let peek = PeekTrailersBody::read_body(body).await;
406434
assert!(peek.peek_trailers().is_some());
407435
assert!(peek.is_end_stream().not());
436+
437+
let (data, trailers) = collect(peek).await;
438+
assert_eq!(data, "hello");
439+
assert_eq!(trailers.unwrap().get("trailer").unwrap(), "shiny");
408440
}
409441

410442
#[tokio::test]
@@ -417,5 +449,27 @@ mod tests {
417449
let peek = PeekTrailersBody::read_body(body).await;
418450
assert!(peek.peek_trailers().is_none());
419451
assert!(peek.is_end_stream().not());
452+
453+
let (data, trailers) = collect(peek).await;
454+
assert_eq!(data, "hellohello");
455+
assert_eq!(trailers.unwrap().get("trailer").unwrap(), "shiny");
456+
}
457+
458+
#[tokio::test]
459+
async fn cannot_peek_body_with_various_pending_polls() {
460+
let (_guard, _handle) = linkerd_tracing::test::trace_init();
461+
let body = MockBody::default()
462+
.then_yield_data(Poll::Ready(data()))
463+
.then_yield_data(Poll::Pending)
464+
.then_yield_data(Poll::Ready(data()))
465+
.then_yield_data(Poll::Pending)
466+
.then_yield_trailer(Poll::Ready(trailers()));
467+
let peek = PeekTrailersBody::read_body(body).await;
468+
assert!(peek.peek_trailers().is_none());
469+
assert!(peek.is_end_stream().not());
470+
471+
let (data, trailers) = collect(peek).await;
472+
assert_eq!(data, "hellohello");
473+
assert_eq!(trailers.unwrap().get("trailer").unwrap(), "shiny");
420474
}
421475
}

0 commit comments

Comments
 (0)