Skip to content

Commit d6891fb

Browse files
committed
feat(cluster): auto discover ip when ip is unspecified or loop back
1 parent 94f9532 commit d6891fb

File tree

9 files changed

+68
-24
lines changed

9 files changed

+68
-24
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/store/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,16 @@ impl MetaStore {
5555
MetaStore::R(_) => false,
5656
}
5757
}
58+
59+
pub async fn get_local_addr(&self) -> std::result::Result<Option<String>, MetaError> {
60+
match self {
61+
MetaStore::L(_) => Ok(None),
62+
MetaStore::R(grpc_client) => {
63+
let client_info = grpc_client.get_client_info().await?;
64+
Ok(Some(client_info.client_addr))
65+
}
66+
}
67+
}
5868
}
5969

6070
#[async_trait::async_trait]

src/query/management/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ common-meta-api = { path = "../../meta/api" }
2222
common-meta-types = { path = "../../meta/types" }
2323
common-proto-conv = { path = "../../meta/proto-conv" }
2424
common-protos = { path = "../../meta/protos" }
25+
common-meta-store = { path = "../../meta/store" }
2526

2627
async-trait = "0.1.56"
2728
serde_json = "1.0.81"

src/query/management/src/cluster/cluster_api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,6 @@ pub trait ClusterApi: Sync + Send {
2828

2929
// Keep the tenant's cluster node alive.
3030
async fn heartbeat(&self, node: &NodeInfo, seq: Option<u64>) -> Result<u64>;
31+
32+
async fn get_local_addr(&self) -> Result<Option<String>>;
3133
}

src/query/management/src/cluster/cluster_mgr.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::ops::Add;
16-
use std::sync::Arc;
1716
use std::time::Duration;
1817
use std::time::UNIX_EPOCH;
1918

@@ -22,6 +21,7 @@ use common_base::base::unescape_for_key;
2221
use common_exception::ErrorCode;
2322
use common_exception::Result;
2423
use common_meta_api::KVApi;
24+
use common_meta_store::MetaStore;
2525
use common_meta_types::KVMeta;
2626
use common_meta_types::MatchSeq;
2727
use common_meta_types::NodeInfo;
@@ -35,14 +35,14 @@ use crate::cluster::ClusterApi;
3535
pub static CLUSTER_API_KEY_PREFIX: &str = "__fd_clusters";
3636

3737
pub struct ClusterMgr {
38-
kv_api: Arc<dyn KVApi>,
38+
metastore: MetaStore,
3939
lift_time: Duration,
4040
cluster_prefix: String,
4141
}
4242

4343
impl ClusterMgr {
4444
pub fn create(
45-
kv_api: Arc<dyn KVApi>,
45+
metastore: MetaStore,
4646
tenant: &str,
4747
cluster_id: &str,
4848
lift_time: Duration,
@@ -54,7 +54,7 @@ impl ClusterMgr {
5454
}
5555

5656
Ok(ClusterMgr {
57-
kv_api,
57+
metastore,
5858
lift_time,
5959
cluster_prefix: format!(
6060
"{}/{}/{}/databend_query",
@@ -87,7 +87,7 @@ impl ClusterApi for ClusterMgr {
8787
let value = Operation::Update(serde_json::to_vec(&node)?);
8888
let node_key = format!("{}/{}", self.cluster_prefix, escape_for_key(&node.id)?);
8989
let upsert_node = self
90-
.kv_api
90+
.metastore
9191
.upsert_kv(UpsertKVReq::new(&node_key, seq, value, meta));
9292

9393
let res = upsert_node.await?.added_or_else(|v| {
@@ -101,7 +101,7 @@ impl ClusterApi for ClusterMgr {
101101
}
102102

103103
async fn get_nodes(&self) -> Result<Vec<NodeInfo>> {
104-
let values = self.kv_api.prefix_list_kv(&self.cluster_prefix).await?;
104+
let values = self.metastore.prefix_list_kv(&self.cluster_prefix).await?;
105105

106106
let mut nodes_info = Vec::with_capacity(values.len());
107107
for (node_key, value) in values {
@@ -116,7 +116,7 @@ impl ClusterApi for ClusterMgr {
116116

117117
async fn drop_node(&self, node_id: String, seq: Option<u64>) -> Result<()> {
118118
let node_key = format!("{}/{}", self.cluster_prefix, escape_for_key(&node_id)?);
119-
let upsert_node = self.kv_api.upsert_kv(UpsertKVReq::new(
119+
let upsert_node = self.metastore.upsert_kv(UpsertKVReq::new(
120120
&node_key,
121121
seq.into(),
122122
Operation::Delete,
@@ -145,7 +145,7 @@ impl ClusterApi for ClusterMgr {
145145
};
146146

147147
let upsert_meta =
148-
self.kv_api
148+
self.metastore
149149
.upsert_kv(UpsertKVReq::new(&node_key, seq, Operation::AsIs, meta));
150150

151151
match upsert_meta.await? {
@@ -157,4 +157,8 @@ impl ClusterApi for ClusterMgr {
157157
UpsertKVReply { .. } => self.add_node(node.clone()).await,
158158
}
159159
}
160+
161+
async fn get_local_addr(&self) -> Result<Option<String>> {
162+
Ok(self.metastore.get_local_addr().await?)
163+
}
160164
}

src/query/management/tests/it/cluster.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use common_exception::Result;
2121
use common_management::*;
2222
use common_meta_api::KVApi;
2323
use common_meta_embedded::MetaEmbedded;
24+
use common_meta_store::MetaStore;
2425
use common_meta_types::NodeInfo;
2526
use common_meta_types::SeqV;
2627

@@ -153,8 +154,8 @@ fn create_test_node_info() -> NodeInfo {
153154
}
154155
}
155156

156-
async fn new_cluster_api() -> Result<(Arc<MetaEmbedded>, ClusterMgr)> {
157-
let test_api = Arc::new(MetaEmbedded::new_temp().await?);
157+
async fn new_cluster_api() -> Result<(MetaStore, ClusterMgr)> {
158+
let test_api = MetaStore::L(Arc::new(MetaEmbedded::new_temp().await?));
158159
let cluster_manager = ClusterMgr::create(
159160
test_api.clone(),
160161
"test-tenant-id",

src/query/service/src/clusters/cluster.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::net::SocketAddr;
1516
use std::ops::RangeInclusive;
17+
use std::str::FromStr;
1618
use std::sync::atomic::AtomicBool;
1719
use std::sync::atomic::Ordering;
1820
use std::sync::Arc;
@@ -35,7 +37,7 @@ use common_exception::Result;
3537
use common_grpc::ConnectionFactory;
3638
use common_management::ClusterApi;
3739
use common_management::ClusterMgr;
38-
use common_meta_api::KVApi;
40+
use common_meta_store::MetaStore;
3941
use common_meta_store::MetaStoreProvider;
4042
use common_meta_types::NodeInfo;
4143
use common_metrics::label_counter_with_val_and_labels;
@@ -139,24 +141,24 @@ static CLUSTER_DISCOVERY: OnceCell<Singleton<Arc<ClusterDiscovery>>> = OnceCell:
139141
impl ClusterDiscovery {
140142
const METRIC_LABEL_FUNCTION: &'static str = "function";
141143

142-
pub async fn create_meta_client(cfg: &Config) -> Result<Arc<dyn KVApi>> {
144+
pub async fn create_meta_client(cfg: &Config) -> Result<MetaStore> {
143145
let meta_api_provider = MetaStoreProvider::new(cfg.meta.to_meta_grpc_client_conf());
144146
match meta_api_provider.try_get_meta_store().await {
145-
Ok(client) => Ok(client.arc()),
147+
Ok(meta_store) => Ok(meta_store),
146148
Err(cause) => Err(cause.add_message_back("(while create cluster api).")),
147149
}
148150
}
149151

150152
pub async fn init(cfg: Config, v: Singleton<Arc<ClusterDiscovery>>) -> Result<()> {
151-
let meta_client = ClusterDiscovery::create_meta_client(&cfg).await?;
152-
v.init(Self::try_create(&cfg, meta_client).await?)?;
153+
let metastore = ClusterDiscovery::create_meta_client(&cfg).await?;
154+
v.init(Self::try_create(&cfg, metastore).await?)?;
153155

154156
CLUSTER_DISCOVERY.set(v).ok();
155157
Ok(())
156158
}
157159

158-
pub async fn try_create(cfg: &Config, api: Arc<dyn KVApi>) -> Result<Arc<ClusterDiscovery>> {
159-
let (lift_time, provider) = Self::create_provider(cfg, api)?;
160+
pub async fn try_create(cfg: &Config, metastore: MetaStore) -> Result<Arc<ClusterDiscovery>> {
161+
let (lift_time, provider) = Self::create_provider(cfg, metastore)?;
160162

161163
Ok(Arc::new(ClusterDiscovery {
162164
local_id: GlobalUniqName::unique(),
@@ -182,13 +184,13 @@ impl ClusterDiscovery {
182184

183185
fn create_provider(
184186
cfg: &Config,
185-
api: Arc<dyn KVApi>,
187+
metastore: MetaStore,
186188
) -> Result<(Duration, Arc<dyn ClusterApi>)> {
187189
// TODO: generate if tenant or cluster id is empty
188190
let tenant_id = &cfg.query.tenant_id;
189191
let cluster_id = &cfg.query.cluster_id;
190192
let lift_time = Duration::from_secs(60);
191-
let cluster_manager = ClusterMgr::create(api, tenant_id, cluster_id, lift_time)?;
193+
let cluster_manager = ClusterMgr::create(metastore, tenant_id, cluster_id, lift_time)?;
192194

193195
Ok((lift_time, Arc::new(cluster_manager)))
194196
}
@@ -350,8 +352,27 @@ impl ClusterDiscovery {
350352

351353
pub async fn register_to_metastore(self: &Arc<Self>, cfg: &Config) -> Result<()> {
352354
let cpus = cfg.query.num_cpus;
353-
// TODO: 127.0.0.1 || ::0
354-
let address = cfg.query.flight_api_address.clone();
355+
let mut address = cfg.query.flight_api_address.clone();
356+
357+
if let Ok(socket_addr) = SocketAddr::from_str(&address) {
358+
let ip_addr = socket_addr.ip();
359+
if ip_addr.is_loopback() || ip_addr.is_unspecified() {
360+
if let Some(local_addr) = self.api_provider.get_local_addr().await? {
361+
let local_socket_addr = SocketAddr::from_str(&local_addr)?;
362+
let new_addr = format!("{}:{}", local_socket_addr.ip(), socket_addr.port());
363+
tracing::warn!(
364+
"Used loopback or unspecified address as cluster flight address. \
365+
we rewrite it(\"{}\" -> \"{}\") for other nodes can connect it.\
366+
If your has proxy between nodes, you can specify the node's IP address in the configuration file.",
367+
address,
368+
new_addr
369+
);
370+
371+
address = new_addr;
372+
}
373+
}
374+
}
375+
355376
let node_info = NodeInfo::create(self.local_id.clone(), cpus, address);
356377

357378
self.drop_invalid_nodes(&node_info).await?;

src/query/service/tests/it/clusters.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ async fn test_remove_invalid_nodes() -> Result<()> {
4444
.query_flight_address("invalid_address_2")
4545
.build();
4646

47-
let meta_client = ClusterDiscovery::create_meta_client(&config_1).await?;
48-
let cluster_discovery_1 = ClusterDiscovery::try_create(&config_1, meta_client.clone()).await?;
49-
let cluster_discovery_2 = ClusterDiscovery::try_create(&config_2, meta_client.clone()).await?;
47+
let metastore = ClusterDiscovery::create_meta_client(&config_1).await?;
48+
let cluster_discovery_1 = ClusterDiscovery::try_create(&config_1, metastore.clone()).await?;
49+
let cluster_discovery_2 = ClusterDiscovery::try_create(&config_2, metastore.clone()).await?;
5050

5151
cluster_discovery_1.register_to_metastore(&config_1).await?;
5252
cluster_discovery_2.register_to_metastore(&config_2).await?;
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
statement query I
2+
select * from system.clusters where host = '0.0.0.0' or host = '::';
3+
4+
----

0 commit comments

Comments
 (0)