Skip to content

Commit a33c9e7

Browse files
authored
refactor: upgrade min-meta-service-version to 1.2.677 (#17783)
refactor: upgrade minimum meta-service version to 1.2.677 1. **Upgrade Minimum Meta-Service Version**: - The meta-service version has been upgraded to `1.2.677`, which adds support for the `initial_flush` feature in `WatchRequest`. - This feature is required by `meta-Semaphore` and `meta-Cache`. **Deployment Note**: Before deploying this commit, the databend-meta service must be upgraded to version `1.2.677` or higher. 2. **Fine-Grained Feature Control in `meta-client`**: - The `meta-client` now supports fine-grained control over feature requirements by defining each databend-meta feature with its minimum supported version. - Features are defined as follows: ```rust pub const KV_API: FeatureSpec = ("kv_api", (1, 2, 259)); pub const KV_READ_V1: FeatureSpec = ("kv_read_v1", (1, 2, 259)); pub const TRANSACTION: FeatureSpec = ("transaction", (1, 2, 259)); pub const EXPORT: FeatureSpec = ("export", (1, 2, 259)); pub const EXPORT_V1: FeatureSpec = ("export_v1", (1, 2, 315)); pub const WATCH: FeatureSpec = ("watch", (1, 2, 259)); pub const WATCH_INITIAL_FLUSH: FeatureSpec = ("watch/initial_flush", (1, 2, 677)); pub const MEMBER_LIST: FeatureSpec = ("member_list", (1, 2, 259)); pub const GET_CLUSTER_STATUS: FeatureSpec = ("get_cluster_status", (1, 2, 259)); pub const GET_CLIENT_INFO: FeatureSpec = ("get_client_info", (1, 2, 259)); ``` 3. **New API for Meta Client**: - Introduced `MetaGrpcClient::try_create_with_features()` to create a `MetaGrpcClient` with limited feature requirements. - During the handshake with the databend-meta service, the client verifies the minimum version of each specified feature against the server version. - This approach ensures maximum compatibility and flexibility, allowing clients to work seamlessly with the meta-service while leveraging new features when supported.
1 parent b554eb9 commit a33c9e7

File tree

15 files changed

+1094
-778
lines changed

15 files changed

+1094
-778
lines changed

.github/actions/test_compat_fuse/action.yml

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,22 @@ runs:
55
steps:
66
- uses: ./.github/actions/setup_test
77
with:
8-
path: ./bins/current
8+
path: ./bins/current/bin
99
artifacts: sqllogictests,meta,query
1010

1111
- name: Test compatibility
1212
shell: bash
13-
# test-*.sh <old-query-ver> <meta-ver> <test-suite>
14-
# The `meta-ver` must be compatible with `<old-query-ver>` and the latest version.
15-
#
16-
# `meta-ver` is not the target to test, just choose one that is compatible with `<old-query-ver>` and the latest query.
1713
run: |
18-
bash ./tests/compat_fuse/test_compat_fuse.sh 1.2.46 1.2.527 base
19-
bash ./tests/compat_fuse/test_compat_fuse.sh 1.2.241 1.2.527 revoke
20-
bash ./tests/compat_fuse/test_compat_fuse.sh 1.2.306 1.2.527 rbac
21-
bash ./tests/compat_fuse/test_compat_fuse.sh 1.2.307 1.2.527 rbac
22-
bash ./tests/compat_fuse/test_compat_fuse.sh 1.2.318 1.2.527 rbac
23-
bash ./tests/compat_fuse/test_compat_fuse_forward.sh 1.2.307 1.2.527 rbac
24-
bash ./tests/compat_fuse/test_compat_fuse_forward.sh 1.2.318 1.2.527 rbac
25-
bash ./tests/compat_fuse/test_compat_fuse.sh 1.2.680 1.2.680 udf
26-
bash ./tests/compat_fuse/test_compat_fuse_forward.sh 1.2.680 1.2.680 udf
14+
bash ./tests/compat_fuse/test_compat_fuse.sh --writer-version 1.2.46 --reader-version current --meta-versions 1.2.527 1.2.677 --logictest-path base
15+
bash ./tests/compat_fuse/test_compat_fuse.sh --writer-version 1.2.241 --reader-version current --meta-versions 1.2.527 1.2.677 --logictest-path revoke
16+
bash ./tests/compat_fuse/test_compat_fuse.sh --writer-version 1.2.306 --reader-version current --meta-versions 1.2.527 1.2.677 --logictest-path rbac
17+
bash ./tests/compat_fuse/test_compat_fuse.sh --writer-version 1.2.307 --reader-version current --meta-versions 1.2.527 1.2.677 --logictest-path rbac
18+
bash ./tests/compat_fuse/test_compat_fuse.sh --writer-version 1.2.318 --reader-version current --meta-versions 1.2.527 1.2.677 --logictest-path rbac
19+
bash ./tests/compat_fuse/test_compat_fuse.sh --writer-version 1.2.680 --reader-version current --meta-versions 1.2.527 1.2.680 --logictest-path udf
20+
21+
bash ./tests/compat_fuse/test_compat_fuse.sh --writer-version current --reader-version 1.2.307 --meta-versions 1.2.677 --logictest-path rbac
22+
bash ./tests/compat_fuse/test_compat_fuse.sh --writer-version current --reader-version 1.2.318 --meta-versions 1.2.677 --logictest-path rbac
23+
bash ./tests/compat_fuse/test_compat_fuse.sh --writer-version current --reader-version 1.2.680 --meta-versions 1.2.680 --logictest-path udf
2724
- name: Upload failure
2825
if: failure()
2926
uses: ./.github/actions/artifact_failure

src/meta/binaries/metabench/main.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use databend_common_meta_app::schema::TableCopiedFileNameIdent;
3838
use databend_common_meta_app::schema::TableNameIdent;
3939
use databend_common_meta_app::schema::UpsertTableOptionReq;
4040
use databend_common_meta_app::tenant::Tenant;
41+
use databend_common_meta_client::required;
4142
use databend_common_meta_client::ClientHandle;
4243
use databend_common_meta_client::MetaGrpcClient;
4344
use databend_common_meta_kvapi::kvapi::KVApi;
@@ -124,8 +125,15 @@ async fn main() {
124125
let param = cmd_and_param.get(1).unwrap_or(&"").to_string();
125126

126127
let handle = runtime::spawn(async move {
127-
let client =
128-
MetaGrpcClient::try_create(vec![addr.to_string()], "root", "xxx", None, None, None);
128+
let client = MetaGrpcClient::try_create_with_features(
129+
vec![addr.to_string()],
130+
"root",
131+
"xxx",
132+
None,
133+
None,
134+
None,
135+
required::read_write(),
136+
);
129137

130138
let client = match client {
131139
Ok(client) => client,
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
use std::time::Duration;
17+
18+
use anyerror::AnyError;
19+
use databend_common_base::containers::ItemManager;
20+
use databend_common_grpc::ConnectionFactory;
21+
use databend_common_grpc::GrpcConnectionError;
22+
use databend_common_grpc::RpcClientTlsConfig;
23+
use databend_common_meta_types::protobuf::meta_service_client::MetaServiceClient;
24+
use databend_common_meta_types::ConnectionError;
25+
use databend_common_meta_types::GrpcConfig;
26+
use databend_common_meta_types::MetaClientError;
27+
use databend_common_meta_types::MetaNetworkError;
28+
use log::info;
29+
use once_cell::sync::OnceCell;
30+
use parking_lot::Mutex;
31+
use tonic::async_trait;
32+
use tonic::transport::Channel;
33+
34+
use crate::endpoints::Endpoints;
35+
use crate::established_client::EstablishedClient;
36+
use crate::grpc_client::AuthInterceptor;
37+
use crate::grpc_client::RealClient;
38+
use crate::FeatureSpec;
39+
use crate::MetaGrpcClient;
40+
use crate::METACLI_COMMIT_SEMVER;
41+
42+
#[derive(Debug)]
43+
pub struct MetaChannelManager {
44+
username: String,
45+
password: String,
46+
timeout: Option<Duration>,
47+
tls_config: Option<RpcClientTlsConfig>,
48+
49+
required_features: &'static [FeatureSpec],
50+
51+
/// The endpoints of the meta-service cluster.
52+
///
53+
/// The endpoints will be added to a built client item
54+
/// and will be updated when a error or successful response is received.
55+
endpoints: Arc<Mutex<Endpoints>>,
56+
}
57+
58+
impl MetaChannelManager {
59+
pub fn new(
60+
username: impl ToString,
61+
password: impl ToString,
62+
timeout: Option<Duration>,
63+
tls_config: Option<RpcClientTlsConfig>,
64+
required_features: &'static [FeatureSpec],
65+
endpoints: Arc<Mutex<Endpoints>>,
66+
) -> Self {
67+
Self {
68+
username: username.to_string(),
69+
password: password.to_string(),
70+
timeout,
71+
tls_config,
72+
required_features,
73+
endpoints,
74+
}
75+
}
76+
77+
#[async_backtrace::framed]
78+
async fn new_established_client(
79+
&self,
80+
addr: &String,
81+
) -> Result<EstablishedClient, MetaClientError> {
82+
let chan = self.build_channel(addr).await?;
83+
84+
let (mut real_client, once) = Self::new_real_client(chan);
85+
86+
info!(
87+
"MetaChannelManager done building RealClient to {}, start handshake",
88+
addr
89+
);
90+
91+
let handshake_res = MetaGrpcClient::handshake(
92+
&mut real_client,
93+
&METACLI_COMMIT_SEMVER,
94+
self.required_features,
95+
&self.username,
96+
&self.password,
97+
)
98+
.await;
99+
100+
info!(
101+
"MetaChannelManager done handshake to {}, result.err(): {:?}",
102+
addr,
103+
handshake_res.as_ref().err()
104+
);
105+
106+
let (token, server_version, features) = handshake_res?;
107+
108+
// Update the token for the client interceptor.
109+
// Safe unwrap(): it is the first time setting it.
110+
once.set(token).unwrap();
111+
112+
Ok(EstablishedClient::new(
113+
real_client,
114+
server_version,
115+
features,
116+
addr,
117+
self.endpoints.clone(),
118+
))
119+
}
120+
121+
/// Create a MetaServiceClient with authentication interceptor
122+
///
123+
/// The returned `OnceCell` is used to fill in a token for the interceptor.
124+
pub fn new_real_client(chan: Channel) -> (RealClient, Arc<OnceCell<Vec<u8>>>) {
125+
let once = Arc::new(OnceCell::new());
126+
127+
let interceptor = AuthInterceptor {
128+
token: once.clone(),
129+
};
130+
131+
let client = MetaServiceClient::with_interceptor(chan, interceptor)
132+
.max_decoding_message_size(GrpcConfig::MAX_DECODING_SIZE)
133+
.max_encoding_message_size(GrpcConfig::MAX_ENCODING_SIZE);
134+
135+
(client, once)
136+
}
137+
138+
#[async_backtrace::framed]
139+
async fn build_channel(&self, addr: &String) -> Result<Channel, MetaNetworkError> {
140+
info!("MetaChannelManager::build_channel to {}", addr);
141+
142+
let ch = ConnectionFactory::create_rpc_channel(addr, self.timeout, self.tls_config.clone())
143+
.await
144+
.map_err(|e| match e {
145+
GrpcConnectionError::InvalidUri { .. } => MetaNetworkError::BadAddressFormat(
146+
AnyError::new(&e).add_context(|| "while creating rpc channel"),
147+
),
148+
GrpcConnectionError::TLSConfigError { .. } => MetaNetworkError::TLSConfigError(
149+
AnyError::new(&e).add_context(|| "while creating rpc channel"),
150+
),
151+
GrpcConnectionError::CannotConnect { .. } => MetaNetworkError::ConnectionError(
152+
ConnectionError::new(e, "while creating rpc channel"),
153+
),
154+
})?;
155+
Ok(ch)
156+
}
157+
}
158+
159+
#[async_trait]
160+
impl ItemManager for MetaChannelManager {
161+
type Key = String;
162+
type Item = EstablishedClient;
163+
type Error = MetaClientError;
164+
165+
#[logcall::logcall(err = "debug")]
166+
#[fastrace::trace]
167+
#[async_backtrace::framed]
168+
async fn build(&self, addr: &Self::Key) -> Result<Self::Item, Self::Error> {
169+
self.new_established_client(addr).await
170+
}
171+
172+
#[logcall::logcall(err = "debug")]
173+
#[fastrace::trace]
174+
#[async_backtrace::framed]
175+
async fn check(&self, ch: Self::Item) -> Result<Self::Item, Self::Error> {
176+
// The underlying `tonic::transport::channel::Channel` reconnects when server is down.
177+
// But we still need to assert the readiness, e.g., when handshake token expires
178+
// If there was an error occurred, the channel will be closed.
179+
if let Some(e) = ch.take_error() {
180+
return Err(MetaNetworkError::from(e).into());
181+
}
182+
Ok(ch)
183+
}
184+
}

0 commit comments

Comments
 (0)