Skip to content

Commit 8409f90

Browse files
authored
chore(cluster): disable parallel commit of cluster tasks (#16851)
* chore(cluster): disable parallel commit of cluster tasks * chore(cluster): disable parallel commit of cluster tasks
1 parent fb79f13 commit 8409f90

File tree

1 file changed

+12
-17
lines changed

1 file changed

+12
-17
lines changed

src/query/service/src/clusters/cluster.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -137,28 +137,23 @@ impl ClusterHelper for Cluster {
137137
)))
138138
}
139139

140-
let mut futures = Vec::with_capacity(message.len());
140+
let mut response = HashMap::with_capacity(message.len());
141141
for (id, message) in message {
142142
let node = get_node(&self.nodes, &id)?;
143143

144-
futures.push({
145-
let config = GlobalConfig::instance();
146-
let flight_address = node.flight_address.clone();
147-
let node_secret = node.secret.clone();
148-
149-
async move {
150-
let mut conn = create_client(&config, &flight_address).await?;
151-
Ok::<_, ErrorCode>((
152-
id,
153-
conn.do_action::<_, Res>(path, node_secret, message, timeout)
154-
.await?,
155-
))
156-
}
157-
});
144+
let config = GlobalConfig::instance();
145+
let flight_address = node.flight_address.clone();
146+
let node_secret = node.secret.clone();
147+
148+
let mut conn = create_client(&config, &flight_address).await?;
149+
response.insert(
150+
id,
151+
conn.do_action::<_, Res>(path, node_secret, message, timeout)
152+
.await?,
153+
);
158154
}
159155

160-
let responses: Vec<(String, Res)> = futures::future::try_join_all(futures).await?;
161-
Ok(responses.into_iter().collect::<HashMap<String, Res>>())
156+
Ok(response)
162157
}
163158
}
164159

0 commit comments

Comments
 (0)