Skip to content

Commit c66a12b

Browse files
sollhuiYour Name
authored and
Your Name
committed
[fix](load) disable fetching meta request to disable_load or decommissioned BE node (#50421)
If user disable load in a certain node:`ALTER SYSTEM MODIFY BACKEND "xxx" SET ("disable_load" = "true");`, planner will not select this node to load but fetch meta request still select the node. The pr disable fetch meta request when set disable_load.
1 parent 74edb1a commit c66a12b

File tree

3 files changed

+114
-1
lines changed

3 files changed

+114
-1
lines changed

fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,13 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx
229229
InternalService.PProxyResult result = null;
230230
try {
231231
while (retryTimes < 3) {
232-
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
232+
List<Long> backendIds = new ArrayList<>();
233+
for (Long beId : Env.getCurrentSystemInfo().getAllBackendIds(true)) {
234+
Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
235+
if (backend != null && backend.isLoadAvailable() && !backend.isDecommissioned()) {
236+
backendIds.add(beId);
237+
}
238+
}
233239
if (backendIds.isEmpty()) {
234240
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
235241
throw new LoadException("Failed to get info. No alive backends");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.kafka.clients.admin.AdminClient
19+
import org.apache.kafka.clients.producer.KafkaProducer
20+
import org.apache.kafka.clients.producer.ProducerRecord
21+
import org.apache.kafka.clients.producer.ProducerConfig
22+
23+
suite("test_disable_load","nonConcurrent,p0") {
24+
String enabled = context.config.otherConfigs.get("enableKafkaTest")
25+
if (enabled != null && enabled.equalsIgnoreCase("true")) {
26+
// 1. send data
27+
def kafkaCsvTpoics = [
28+
"test_disable_load",
29+
]
30+
String kafka_port = context.config.otherConfigs.get("kafka_port")
31+
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
32+
def kafka_broker = "${externalEnvIp}:${kafka_port}"
33+
def props = new Properties()
34+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
35+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
36+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
37+
def producer = new KafkaProducer<>(props)
38+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
39+
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
40+
def lines = txt.readLines()
41+
lines.each { line ->
42+
logger.info("=====${line}========")
43+
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
44+
producer.send(record)
45+
}
46+
}
47+
48+
// 2. create table and routine load job
49+
def tableName = "test_disable_load"
50+
def job = "test_disable_load"
51+
sql """ DROP TABLE IF EXISTS ${tableName} """
52+
sql """
53+
CREATE TABLE IF NOT EXISTS ${tableName} (
54+
`k1` int(20) NULL,
55+
`k2` string NULL,
56+
`v1` date NULL,
57+
`v2` string NULL,
58+
`v3` datetime NULL,
59+
`v4` string NULL
60+
) ENGINE=OLAP
61+
DUPLICATE KEY(`k1`)
62+
COMMENT 'OLAP'
63+
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
64+
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
65+
"""
66+
67+
def backends = sql_return_maparray('show backends')
68+
def beId = backends[0].BackendId
69+
try {
70+
sql """ALTER SYSTEM MODIFY BACKEND "${beId}" SET ("disable_load" = "true"); """
71+
sql """
72+
CREATE ROUTINE LOAD ${job} ON ${tableName}
73+
COLUMNS TERMINATED BY ","
74+
FROM KAFKA
75+
(
76+
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
77+
"kafka_topic" = "${kafkaCsvTpoics[0]}"
78+
);
79+
"""
80+
81+
// 3. check error info
82+
def count = 0
83+
def res = 0
84+
while (true) {
85+
res = sql "select count(*) from ${tableName}"
86+
log.info("res: ${res}")
87+
def state = sql "show routine load for ${job}"
88+
log.info("routine load state: ${state[0][8].toString()}".toString())
89+
log.info("reason of state changed: ${state[0][17].toString()}".toString())
90+
if (state[0][17].toString().contains("Failed to get info. No alive backends")) {
91+
break
92+
}
93+
if (count >= 60) {
94+
log.error("routine load test fail")
95+
assertEquals(1, 2)
96+
break
97+
}
98+
sleep(1000)
99+
count++
100+
}
101+
} finally {
102+
sql """ALTER SYSTEM MODIFY BACKEND "${beId}" SET ("disable_load" = "fasle"); """
103+
sql "stop routine load for ${job}"
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)