From c66a12bb45339c5d8c0436f3b99805e6a2441060 Mon Sep 17 00:00:00 2001 From: hui lai Date: Mon, 28 Apr 2025 19:44:54 +0800 Subject: [PATCH] [fix](load) disable fetching meta request to disable_load or decommissioned BE node (#50421) If user disable load in a certain node:`ALTER SYSTEM MODIFY BACKEND "xxx" SET ("disable_load" = "true");`, planner will not select this node to load but fetch meta request still select the node. The pr disable fetch meta request when set disable_load. --- .../doris/datasource/kafka/KafkaUtil.java | 8 +- .../routine_load/data/test_disable_load.csv | 1 + .../routine_load/test_disable_load.groovy | 106 ++++++++++++++++++ 3 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 regression-test/suites/load_p0/routine_load/data/test_disable_load.csv create mode 100644 regression-test/suites/load_p0/routine_load/test_disable_load.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java index 293f0875ac0cd1..da683cc2b371c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java @@ -229,7 +229,13 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx InternalService.PProxyResult result = null; try { while (retryTimes < 3) { - List backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + List backendIds = new ArrayList<>(); + for (Long beId : Env.getCurrentSystemInfo().getAllBackendIds(true)) { + Backend backend = Env.getCurrentSystemInfo().getBackend(beId); + if (backend != null && backend.isLoadAvailable() && !backend.isDecommissioned()) { + backendIds.add(beId); + } + } if (backendIds.isEmpty()) { MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L); throw new LoadException("Failed to get info. No alive backends"); diff --git a/regression-test/suites/load_p0/routine_load/data/test_disable_load.csv b/regression-test/suites/load_p0/routine_load/data/test_disable_load.csv new file mode 100644 index 00000000000000..b226b99ee4e0e0 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_disable_load.csv @@ -0,0 +1 @@ +1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_disable_load.groovy b/regression-test/suites/load_p0/routine_load/test_disable_load.groovy new file mode 100644 index 00000000000000..fbac33938490d6 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_disable_load.groovy @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_disable_load","nonConcurrent,p0") { + String enabled = context.config.otherConfigs.get("enableKafkaTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // 1. send data + def kafkaCsvTpoics = [ + "test_disable_load", + ] + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + def producer = new KafkaProducer<>(props) + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + + // 2. create table and routine load job + def tableName = "test_disable_load" + def job = "test_disable_load" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + def backends = sql_return_maparray('show backends') + def beId = backends[0].BackendId + try { + sql """ALTER SYSTEM MODIFY BACKEND "${beId}" SET ("disable_load" = "true"); """ + sql """ + CREATE ROUTINE LOAD ${job} ON ${tableName} + COLUMNS TERMINATED BY "," + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}" + ); + """ + + // 3. check error info + def count = 0 + def res = 0 + while (true) { + res = sql "select count(*) from ${tableName}" + log.info("res: ${res}") + def state = sql "show routine load for ${job}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (state[0][17].toString().contains("Failed to get info. No alive backends")) { + break + } + if (count >= 60) { + log.error("routine load test fail") + assertEquals(1, 2) + break + } + sleep(1000) + count++ + } + } finally { + sql """ALTER SYSTEM MODIFY BACKEND "${beId}" SET ("disable_load" = "fasle"); """ + sql "stop routine load for ${job}" + } + } +} \ No newline at end of file