Skip to content

Commit ebeab93

Browse files
authored
fix(storage): Move timeout layer before to ensure cancel safety (#17902)
1 parent 11d9980 commit ebeab93

File tree

1 file changed

+14
-12
lines changed

1 file changed

+14
-12
lines changed

src/common/storage/src/operator.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,8 @@ pub fn init_operator(cfg: &StorageParams) -> Result<Operator> {
116116
/// Please balance the performance and compile time.
117117
pub fn build_operator<B: Builder>(builder: B) -> Result<Operator> {
118118
let ob = Operator::new(builder)?
119-
// NOTE
120-
//
121-
// Magic happens here. We will add a layer upon original
122-
// storage operator so that all underlying storage operations
123-
// will send to storage runtime.
124-
.layer(RuntimeLayer::new(GlobalIORuntime::instance()))
125-
.finish();
126-
127-
// Make sure the http client has been updated.
128-
ob.update_http_client(|_| HttpClient::with(StorageHttpClient::default()));
129-
130-
let mut op = ob
119+
// Timeout layer is required to be the first layer so that internal
120+
// futures can be cancelled safely when the timeout is reached.
131121
.layer({
132122
let retry_timeout = env::var("_DATABEND_INTERNAL_RETRY_TIMEOUT")
133123
.ok()
@@ -153,6 +143,18 @@ pub fn build_operator<B: Builder>(builder: B) -> Result<Operator> {
153143

154144
timeout_layer
155145
})
146+
// NOTE
147+
//
148+
// Magic happens here. We will add a layer upon original
149+
// storage operator so that all underlying storage operations
150+
// will send to storage runtime.
151+
.layer(RuntimeLayer::new(GlobalIORuntime::instance()))
152+
.finish();
153+
154+
// Make sure the http client has been updated.
155+
ob.update_http_client(|_| HttpClient::with(StorageHttpClient::default()));
156+
157+
let mut op = ob
156158
// Add retry
157159
.layer(
158160
RetryLayer::new()

0 commit comments

Comments
 (0)