Skip to content

Commit 9a49614

Browse files
committed
feat: Meta-service cluster is gated by Enterprise Edition
By default meta-service disallows clustering. Meta-service cluster is only enabled when raft-config `databend_enterprise_license` is set and is valid. No feature in the jwt claim is examined. The EE gate check when a meta node initiate raft-protocol network instance. Thus without a valid EE token, all raft-protocol are disabled, including `RequestVote`, `AppendEntries`, `InstallSnapshot` and internal request forward. If EE token is not set, an error will be outputed to log file. - New config `databend_enterprise_license`: ``` [raft_config] databend_enterprise_license = "<token>" ``` This token is same as the one used by databend-query. - When testing, a temp key pair and jwt claim is created to pass integration tests, this is enabled by `fake_ee_license` config entry. - Other changes: Add `DisplaySlice` and `DisplayUnixTimeStampExt` to display slice of `Display` instance and unix timestamp.
1 parent da0dfaa commit 9a49614

File tree

13 files changed

+311
-31
lines changed

13 files changed

+311
-31
lines changed

Cargo.lock

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt;
16+
17+
/// Implement `Display` for `&[T]` if T is `Display`.
18+
///
19+
/// It outputs at most `MAX` elements, excluding those from the 5th to the second-to-last one:
20+
/// - `DisplaySlice(&[1,2,3,4,5,6])` outputs: `"[1,2,3,4,...,6]"`.
21+
pub struct DisplaySlice<'a, T: fmt::Display, const MAX: usize = 5>(pub &'a [T]);
22+
23+
impl<'a, T: fmt::Display, const MAX: usize> fmt::Display for DisplaySlice<'a, T, MAX> {
24+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25+
let slice = self.0;
26+
let len = slice.len();
27+
28+
write!(f, "[")?;
29+
30+
if len > MAX {
31+
for (i, t) in slice[..(MAX - 1)].iter().enumerate() {
32+
if i > 0 {
33+
write!(f, ",")?;
34+
}
35+
36+
write!(f, "{}", t)?;
37+
}
38+
39+
write!(f, ",..,")?;
40+
write!(f, "{}", slice.last().unwrap())?;
41+
} else {
42+
for (i, t) in slice.iter().enumerate() {
43+
if i > 0 {
44+
write!(f, ",")?;
45+
}
46+
47+
write!(f, "{}", t)?;
48+
}
49+
}
50+
51+
write!(f, "]")
52+
}
53+
}
54+
55+
pub trait DisplaySliceExt<'a, T: fmt::Display> {
56+
fn display(&'a self) -> DisplaySlice<'a, T>;
57+
58+
/// Display at most `MAX` elements.
59+
fn display_n<const MAX: usize>(&'a self) -> DisplaySlice<'a, T, MAX>;
60+
}
61+
62+
impl<T> DisplaySliceExt<'_, T> for [T]
63+
where T: fmt::Display
64+
{
65+
fn display(&self) -> DisplaySlice<T> {
66+
DisplaySlice(self)
67+
}
68+
69+
fn display_n<const MAX: usize>(&'_ self) -> DisplaySlice<'_, T, MAX> {
70+
DisplaySlice(self)
71+
}
72+
}
73+
74+
#[cfg(test)]
75+
mod tests {
76+
use super::DisplaySlice;
77+
78+
#[test]
79+
fn test_display_slice() {
80+
let a = vec![1, 2, 3, 4];
81+
assert_eq!("[1,2,3,4]", DisplaySlice::<_>(&a).to_string());
82+
83+
let a = vec![1, 2, 3, 4, 5];
84+
assert_eq!("[1,2,3,4,5]", DisplaySlice::<_>(&a).to_string());
85+
86+
let a = vec![1, 2, 3, 4, 5, 6];
87+
assert_eq!("[1,2,3,4,..,6]", DisplaySlice::<_>(&a).to_string());
88+
89+
let a = vec![1, 2, 3, 4, 5, 6, 7];
90+
assert_eq!("[1,2,3,4,..,7]", DisplaySlice::<_>(&a).to_string());
91+
92+
let a = vec![1, 2, 3, 4, 5, 6, 7];
93+
assert_eq!("[1,..,7]", DisplaySlice::<_, 2>(&a).to_string());
94+
}
95+
}

src/common/base/src/display/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
// limitations under the License.
1414

1515
pub mod display_option;
16+
pub mod display_slice;
1617
pub mod display_unix_epoch;

src/meta/ee/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ doctest = false
1212
test = true
1313

1414
[dependencies]
15+
anyhow = { workspace = true }
16+
databend-common-base = { workspace = true }
17+
databend-common-license = { workspace = true }
18+
jwt-simple = { workspace = true }
19+
log = { workspace = true }
1520

1621
[build-dependencies]
1722
databend-common-building = { workspace = true }

src/meta/ee/src/lib.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,111 @@
1111
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
use std::sync::Mutex;
17+
use std::time::Duration;
18+
19+
use databend_common_base::display::display_unix_epoch::DisplayUnixTimeStampExt;
20+
use databend_common_license::license::LicenseInfo;
21+
use jwt_simple::algorithms::ECDSAP256KeyPairLike;
22+
use jwt_simple::algorithms::ECDSAP256PublicKeyLike;
23+
use jwt_simple::algorithms::ES256KeyPair;
24+
use jwt_simple::algorithms::ES256PublicKey;
25+
use jwt_simple::claims::Claims;
26+
use jwt_simple::claims::JWTClaims;
27+
use jwt_simple::prelude::Clock;
28+
29+
#[derive(Clone, Default)]
30+
pub struct MetaServiceEnterpriseGate {
31+
/// License ES256 signed jwt claim token.
32+
license_token: Arc<Mutex<Option<String>>>,
33+
34+
/// The public key to verify the license token.
35+
public_key: String,
36+
}
37+
38+
impl MetaServiceEnterpriseGate {
39+
const LICENSE_PUBLIC_KEY: &'static str = r#"-----BEGIN PUBLIC KEY-----
40+
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEGsKCbhXU7j56VKZ7piDlLXGhud0a
41+
pWjW3wxSdeARerxs/BeoWK7FspDtfLaAT8iJe4YEmR0JpkRQ8foWs0ve3w==
42+
-----END PUBLIC KEY-----"#;
43+
44+
pub fn new(token: Option<String>) -> Self {
45+
Self {
46+
license_token: Arc::new(Mutex::new(token)),
47+
public_key: Self::LICENSE_PUBLIC_KEY.to_string(),
48+
}
49+
}
50+
51+
/// Create with a temp license for testing.
52+
pub fn new_testing() -> Self {
53+
let lic = LicenseInfo {
54+
r#type: Some("trial".to_string()),
55+
org: Some("databend".to_string()),
56+
tenants: Some(vec!["test".to_string()]),
57+
features: None,
58+
};
59+
60+
let key_pair = ES256KeyPair::generate();
61+
let claims = Claims::with_custom_claims(lic, jwt_simple::prelude::Duration::from_hours(2));
62+
let token = key_pair.sign(claims).unwrap();
63+
64+
let public_key = key_pair.public_key().to_pem().unwrap();
65+
66+
Self {
67+
license_token: Arc::new(Mutex::new(Some(token))),
68+
public_key,
69+
}
70+
}
71+
72+
/// Parse the JWT token and restore the claims.
73+
fn parse_jwt_token(&self, raw: &str) -> Result<JWTClaims<LicenseInfo>, anyhow::Error> {
74+
let public_key = ES256PublicKey::from_pem(&self.public_key)?;
75+
76+
let claim = public_key.verify_token::<LicenseInfo>(raw, None)?;
77+
78+
Ok(claim)
79+
}
80+
81+
fn check_license(&self, raw: &str) -> Result<(), anyhow::Error> {
82+
let claim = self.parse_jwt_token(raw)?;
83+
84+
let now = Clock::now_since_epoch();
85+
86+
if Some(now) > claim.expires_at {
87+
let expires_at = claim.expires_at.unwrap_or_default();
88+
let unix_timestamp = Duration::from_millis(expires_at.as_millis());
89+
90+
return Err(anyhow::anyhow!(format!(
91+
"License is expired at: {}",
92+
unix_timestamp.display_unix_timestamp()
93+
)));
94+
}
95+
96+
Ok(())
97+
}
98+
99+
pub fn assert_cluster_enabled(&self) -> Result<(), anyhow::Error> {
100+
let token = {
101+
let x = self.license_token.lock().unwrap();
102+
x.clone()
103+
};
104+
105+
let Some(token) = token.as_ref() else {
106+
log::error!(
107+
"No license set in config `databend_enterprise_license`, clustering is disabled",
108+
);
109+
return Err(anyhow::anyhow!(
110+
"No license set in config `databend_enterprise_license`, clustering is disabled"
111+
));
112+
};
113+
114+
if let Err(e) = self.check_license(token) {
115+
log::error!("Check license failed: {}", e);
116+
return Err(e);
117+
}
118+
119+
Ok(())
120+
}
121+
}

src/meta/raft-store/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ pub struct RaftConfig {
131131

132132
/// Max timeout(in milli seconds) when waiting a cluster leader.
133133
pub wait_leader_timeout: u64,
134+
135+
/// License token in string to enable enterprise features(including: `cluster`)
136+
pub databend_enterprise_license: Option<String>,
137+
138+
/// For test only: whether to fake an enterprise license.
139+
pub fake_ee_license: bool,
134140
}
135141

136142
pub fn get_default_raft_advertise_host() -> String {
@@ -172,6 +178,8 @@ impl Default for RaftConfig {
172178
sled_max_cache_size_mb: 10 * 1024,
173179
cluster_name: "foo_cluster".to_string(),
174180
wait_leader_timeout: 70000,
181+
databend_enterprise_license: None,
182+
fake_ee_license: false,
175183
}
176184
}
177185
}

src/meta/service/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ anyerror = { workspace = true }
2626
anyhow = { workspace = true }
2727
async-trait = { workspace = true }
2828
backon = "0.4"
29+
chrono = { workspace = true }
2930
clap = { workspace = true }
3031
databend-common-arrow = { workspace = true }
3132
databend-common-base = { workspace = true }
3233
databend-common-grpc = { workspace = true }
3334
databend-common-http = { workspace = true }
35+
databend-common-license = { workspace = true }
3436
databend-common-meta-api = { workspace = true }
3537
databend-common-meta-client = { workspace = true }
3638
databend-common-meta-kvapi = { workspace = true }
@@ -40,6 +42,7 @@ databend-common-meta-stoerr = { workspace = true }
4042
databend-common-meta-types = { workspace = true }
4143
databend-common-metrics = { workspace = true }
4244
databend-common-tracing = { workspace = true }
45+
databend-enterprise-meta = { workspace = true }
4346
deepsize = { workspace = true }
4447
derive_more = { workspace = true }
4548
fastrace = { workspace = true }
@@ -48,6 +51,7 @@ futures = { workspace = true }
4851
futures-async-stream = { workspace = true }
4952
http = { workspace = true }
5053
itertools = { workspace = true }
54+
jwt-simple = { workspace = true }
5155
log = { workspace = true }
5256
logcall = { workspace = true }
5357
maplit = "1.0.2"

src/meta/service/src/configs/outer_v0.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ pub struct ConfigViaEnv {
303303
pub sled_tree_prefix: String,
304304
pub sled_max_cache_size_mb: u64,
305305
pub cluster_name: String,
306+
pub databend_enterprise_license: Option<String>,
306307
}
307308

308309
impl Default for ConfigViaEnv {
@@ -356,6 +357,7 @@ impl From<Config> for ConfigViaEnv {
356357
sled_tree_prefix: cfg.raft_config.sled_tree_prefix,
357358
sled_max_cache_size_mb: cfg.raft_config.sled_max_cache_size_mb,
358359
cluster_name: cfg.raft_config.cluster_name,
360+
databend_enterprise_license: cfg.raft_config.databend_enterprise_license,
359361
}
360362
}
361363
}
@@ -393,6 +395,7 @@ impl Into<Config> for ConfigViaEnv {
393395
sled_tree_prefix: self.sled_tree_prefix,
394396
sled_max_cache_size_mb: self.sled_max_cache_size_mb,
395397
cluster_name: self.cluster_name,
398+
databend_enterprise_license: self.databend_enterprise_license,
396399
};
397400
let log_config = LogConfig {
398401
file: FileLogConfig {
@@ -564,6 +567,10 @@ pub struct RaftConfig {
564567
/// Max timeout(in milli seconds) when waiting a cluster leader.
565568
#[clap(long, default_value = "180000")]
566569
pub wait_leader_timeout: u64,
570+
571+
/// License token in string to enable enterprise features(including: `cluster`)
572+
#[clap(long, value_name = "VALUE")]
573+
pub databend_enterprise_license: Option<String>,
567574
}
568575

569576
// TODO(rotbl): should not be used.
@@ -602,6 +609,8 @@ impl From<RaftConfig> for InnerRaftConfig {
602609
sled_max_cache_size_mb: x.sled_max_cache_size_mb,
603610
cluster_name: x.cluster_name,
604611
wait_leader_timeout: x.wait_leader_timeout,
612+
databend_enterprise_license: x.databend_enterprise_license,
613+
fake_ee_license: false,
605614
}
606615
}
607616
}
@@ -635,6 +644,7 @@ impl From<InnerRaftConfig> for RaftConfig {
635644
sled_max_cache_size_mb: inner.sled_max_cache_size_mb,
636645
cluster_name: inner.cluster_name,
637646
wait_leader_timeout: inner.wait_leader_timeout,
647+
databend_enterprise_license: inner.databend_enterprise_license,
638648
}
639649
}
640650
}

0 commit comments

Comments
 (0)