Skip to content

Commit 4b21349

Browse files
authored
Merge branch 'main' into main
2 parents 8a8c24b + 812b35b commit 4b21349

File tree

16 files changed

+487
-318
lines changed

16 files changed

+487
-318
lines changed

src/common/base/src/base/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ mod profiling;
1818
mod progress;
1919
mod runtime;
2020
mod runtime_tracker;
21+
mod select;
2122
mod shutdown_signal;
2223
mod singleton_instance;
2324
mod stop_handle;
@@ -37,6 +38,8 @@ pub use runtime::Runtime;
3738
pub use runtime::TrySpawn;
3839
pub use runtime_tracker::RuntimeTracker;
3940
pub use runtime_tracker::ThreadTracker;
41+
pub use select::select3;
42+
pub use select::Select3Output;
4043
pub use shutdown_signal::signal_stream;
4144
pub use shutdown_signal::DummySignalStream;
4245
pub use shutdown_signal::SignalStream;

src/common/base/src/base/select.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::future::Future;
16+
use std::pin::Pin;
17+
use std::task::Context;
18+
use std::task::Poll;
19+
20+
use futures::FutureExt;
21+
22+
pub fn select3<A, B, C>(fut1: A, fut2: B, fut3: C) -> Select3<A, B, C>
23+
where
24+
A: Future + Unpin,
25+
B: Future + Unpin,
26+
C: Future + Unpin,
27+
{
28+
Select3 {
29+
inner: Some((fut1, fut2, fut3)),
30+
}
31+
}
32+
33+
pub struct Select3<A, B, C> {
34+
inner: Option<(A, B, C)>,
35+
}
36+
37+
impl<A: Unpin, B: Unpin, C: Unpin> Unpin for Select3<A, B, C> {}
38+
39+
impl<A, B, C> Future for Select3<A, B, C>
40+
where
41+
A: Future + Unpin,
42+
B: Future + Unpin,
43+
C: Future + Unpin,
44+
{
45+
type Output = Select3Output<(A::Output, B, C), (B::Output, A, C), (C::Output, A, B)>;
46+
47+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
48+
let (mut a, mut b, mut c) = self.inner.take().expect("cannot poll Select twice");
49+
50+
if let Poll::Ready(val) = a.poll_unpin(cx) {
51+
return Poll::Ready(Select3Output::Left((val, b, c)));
52+
}
53+
54+
if let Poll::Ready(val) = b.poll_unpin(cx) {
55+
return Poll::Ready(Select3Output::Middle((val, a, c)));
56+
}
57+
58+
if let Poll::Ready(val) = c.poll_unpin(cx) {
59+
return Poll::Ready(Select3Output::Right((val, a, b)));
60+
}
61+
62+
self.inner = Some((a, b, c));
63+
Poll::Pending
64+
}
65+
}
66+
67+
pub enum Select3Output<A, B, C> {
68+
Left(A),
69+
Middle(B),
70+
Right(C),
71+
}

src/query/service/src/api/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub use rpc::FragmentPayload;
3030
pub use rpc::FragmentPlanPacket;
3131
pub use rpc::InitNodesChannelPacket;
3232
pub use rpc::MergeExchange;
33+
pub use rpc::PrecommitBlock;
3334
pub use rpc::QueryFragmentsPlanPacket;
3435
pub use rpc::ServerFlightExchange;
3536
pub use rpc::ShuffleDataExchange;

src/query/service/src/api/rpc/exchange/exchange_manager.rs

Lines changed: 8 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ use crate::api::rpc::flight_client::FlightExchange;
4040
use crate::api::rpc::flight_scatter_broadcast::BroadcastFlightScatter;
4141
use crate::api::rpc::flight_scatter_hash::HashFlightScatter;
4242
use crate::api::rpc::flight_scatter_hash_v2::HashFlightScatterV2;
43-
use crate::api::rpc::packets::DataPacket;
4443
use crate::api::rpc::Packet;
4544
use crate::api::DataExchange;
4645
use crate::api::FlightClient;
@@ -294,20 +293,14 @@ impl DataExchangeManager {
294293
let query_id = ctx.get_id();
295294
let mut statistics_receiver = statistics_receiver.lock();
296295

297-
if let Some(error) = may_error {
298-
// TODO: try get recv error if network error.
299-
statistics_receiver.shutdown();
300-
ctx.get_exchange_manager().on_finished_query(&query_id);
301-
302-
if error.code() == common_exception::ABORT_QUERY {
303-
statistics_receiver.take_receive_error()?;
304-
}
296+
statistics_receiver.shutdown();
297+
ctx.get_exchange_manager().on_finished_query(&query_id);
298+
statistics_receiver.wait_shutdown()?;
305299

306-
return Err(error.clone());
300+
match may_error {
301+
None => Ok(()),
302+
Some(error_code) => Err(error_code.clone()),
307303
}
308-
309-
ctx.get_exchange_manager().on_finished_query(&query_id);
310-
statistics_receiver.wait_shutdown()
311304
});
312305

313306
Ok(build_res)
@@ -584,39 +577,12 @@ impl QueryCoordinator {
584577
}
585578

586579
let ctx = query_ctx.clone();
587-
let request_server_exchange = request_server_exchanges[0].clone();
588580
let mut statistics_sender =
589-
StatisticsSender::create(&query_id, ctx, request_server_exchange);
581+
StatisticsSender::create(&query_id, ctx, request_server_exchanges.remove(0));
590582
statistics_sender.start();
591583

592584
Thread::named_spawn(Some(String::from("Distributed-Executor")), move || {
593-
if let Err(cause) = executor.execute() {
594-
let request_server_exchange = request_server_exchanges.remove(0);
595-
596-
futures::executor::block_on(async move {
597-
statistics_sender.shutdown();
598-
599-
if let Err(_cause) = request_server_exchange
600-
.send(DataPacket::ErrorCode(cause))
601-
.await
602-
{
603-
tracing::warn!(
604-
"Cannot send message to request server when executor failure."
605-
);
606-
};
607-
608-
// Wait request server close channel.
609-
// request_server_exchange.close_output();
610-
// while let Ok(Some(_data)) = request_server_exchange.recv().await {}
611-
});
612-
613-
return;
614-
}
615-
616-
// TODO: destroy query resource when panic?
617-
// Destroy coordinator_server_exchange if executor is not failure.
618-
statistics_sender.shutdown();
619-
drop(request_server_exchanges);
585+
statistics_sender.shutdown(executor.execute().err());
620586
query_ctx
621587
.get_exchange_manager()
622588
.on_finished_query(&query_id);

src/query/service/src/api/rpc/exchange/exchange_transform.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ use crate::api::rpc::exchange::exchange_params::SerializeParams;
3535
use crate::api::rpc::exchange::exchange_params::ShuffleExchangeParams;
3636
use crate::api::rpc::exchange::exchange_transform_source::ExchangeSourceTransform;
3737
use crate::api::rpc::flight_client::FlightExchange;
38-
use crate::api::rpc::packets::PrecommitBlock;
39-
use crate::api::rpc::packets::ProgressInfo;
4038
use crate::api::DataPacket;
4139
use crate::api::FragmentData;
4240
use crate::clusters::ClusterHelper;
@@ -256,9 +254,9 @@ impl Processor for ExchangeTransform {
256254
if let Some(remote_data) = self.remote_data.take() {
257255
return match remote_data {
258256
DataPacket::ErrorCode(v) => self.on_recv_error(v),
259-
DataPacket::Progress(v) => self.on_recv_progress(v),
257+
DataPacket::ProgressAndPrecommit { .. } => unreachable!(),
258+
DataPacket::FetchProgressAndPrecommit => unreachable!(),
260259
DataPacket::FragmentData(v) => self.on_recv_data(v),
261-
DataPacket::PrecommitBlock(v) => self.on_recv_precommit(v),
262260
};
263261
}
264262

@@ -331,12 +329,4 @@ impl ExchangeTransform {
331329

332330
Ok(())
333331
}
334-
335-
fn on_recv_progress(&mut self, _progress: ProgressInfo) -> Result<()> {
336-
unimplemented!()
337-
}
338-
339-
fn on_recv_precommit(&mut self, _fragment_data: PrecommitBlock) -> Result<()> {
340-
unimplemented!()
341-
}
342332
}

src/query/service/src/api/rpc/exchange/exchange_transform_source.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ use common_exception::Result;
2525
use crate::api::rpc::exchange::exchange_params::ExchangeParams;
2626
use crate::api::rpc::exchange::exchange_params::MergeExchangeParams;
2727
use crate::api::rpc::flight_client::FlightExchange;
28-
use crate::api::rpc::packets::PrecommitBlock;
29-
use crate::api::rpc::packets::ProgressInfo;
3028
use crate::api::DataPacket;
3129
use crate::api::FragmentData;
3230
use crate::pipelines::processors::port::InputPort;
@@ -129,9 +127,9 @@ impl Processor for ExchangeSourceTransform {
129127
if let Some(remote_data) = self.remote_flight_data.take() {
130128
return match remote_data {
131129
DataPacket::ErrorCode(v) => self.on_recv_error(v),
132-
DataPacket::Progress(v) => self.on_recv_progress(v),
133130
DataPacket::FragmentData(v) => self.on_recv_data(v),
134-
DataPacket::PrecommitBlock(v) => self.on_recv_precommit(v),
131+
DataPacket::FetchProgressAndPrecommit => unreachable!(),
132+
DataPacket::ProgressAndPrecommit { .. } => unreachable!(),
135133
};
136134
}
137135

@@ -178,12 +176,4 @@ impl ExchangeSourceTransform {
178176

179177
Ok(())
180178
}
181-
182-
fn on_recv_progress(&mut self, _progress: ProgressInfo) -> Result<()> {
183-
unimplemented!()
184-
}
185-
186-
fn on_recv_precommit(&mut self, _fragment_data: PrecommitBlock) -> Result<()> {
187-
unimplemented!()
188-
}
189179
}

0 commit comments

Comments
 (0)