1
+ // Licensed to the Apache Software Foundation (ASF) under one
2
+ // or more contributor license agreements. See the NOTICE file
3
+ // distributed with this work for additional information
4
+ // regarding copyright ownership. The ASF licenses this file
5
+ // to you under the Apache License, Version 2.0 (the
6
+ // "License"); you may not use this file except in compliance
7
+ // with the License. You may obtain a copy of the License at
8
+ //
9
+ // http://www.apache.org/licenses/LICENSE-2.0
10
+ //
11
+ // Unless required by applicable law or agreed to in writing,
12
+ // software distributed under the License is distributed on an
13
+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
+ // KIND, either express or implied. See the License for the
15
+ // specific language governing permissions and limitations
16
+ // under the License.
17
+
18
+ import org.apache.kafka.clients.admin.AdminClient
19
+ import org.apache.kafka.clients.producer.KafkaProducer
20
+ import org.apache.kafka.clients.producer.ProducerRecord
21
+ import org.apache.kafka.clients.producer.ProducerConfig
22
+
23
+ suite(" test_disable_load" ," nonConcurrent,p0" ) {
24
+ String enabled = context. config. otherConfigs. get(" enableKafkaTest" )
25
+ if (enabled != null && enabled. equalsIgnoreCase(" true" )) {
26
+ // 1. send data
27
+ def kafkaCsvTpoics = [
28
+ " test_disable_load" ,
29
+ ]
30
+ String kafka_port = context. config. otherConfigs. get(" kafka_port" )
31
+ String externalEnvIp = context. config. otherConfigs. get(" externalEnvIp" )
32
+ def kafka_broker = " ${ externalEnvIp} :${ kafka_port} "
33
+ def props = new Properties ()
34
+ props. put(ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , " ${ kafka_broker} " . toString())
35
+ props. put(ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , " org.apache.kafka.common.serialization.StringSerializer" )
36
+ props. put(ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , " org.apache.kafka.common.serialization.StringSerializer" )
37
+ def producer = new KafkaProducer<> (props)
38
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
39
+ def txt = new File (""" ${ context.file.parent} /data/${ kafkaCsvTopic} .csv""" ). text
40
+ def lines = txt. readLines()
41
+ lines. each { line ->
42
+ logger. info(" =====${ line} ========" )
43
+ def record = new ProducerRecord<> (kafkaCsvTopic, null , line)
44
+ producer. send(record)
45
+ }
46
+ }
47
+
48
+ // 2. create table and routine load job
49
+ def tableName = " test_disable_load"
50
+ def job = " test_progress"
51
+ sql """ DROP TABLE IF EXISTS ${ tableName} """
52
+ sql """
53
+ CREATE TABLE IF NOT EXISTS ${ tableName} (
54
+ `k1` int(20) NULL,
55
+ `k2` string NULL,
56
+ `v1` date NULL,
57
+ `v2` string NULL,
58
+ `v3` datetime NULL,
59
+ `v4` string NULL
60
+ ) ENGINE=OLAP
61
+ DUPLICATE KEY(`k1`)
62
+ COMMENT 'OLAP'
63
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
64
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
65
+ """
66
+
67
+ def backends = sql_return_maparray(' show backends' )
68
+ def beId = backends[0 ]. BackendId
69
+ try {
70
+ sql """ ALTER SYSTEM MODIFY BACKEND "${ beId} " SET ("disable_load" = "true"); """
71
+ sql """
72
+ CREATE ROUTINE LOAD ${ job} ON ${ tableName}
73
+ COLUMNS TERMINATED BY ","
74
+ FROM KAFKA
75
+ (
76
+ "kafka_broker_list" = "${ externalEnvIp} :${ kafka_port} ",
77
+ "kafka_topic" = "${ kafkaCsvTpoics[0]} "
78
+ );
79
+ """
80
+
81
+ // 3. check error info
82
+ def count = 0
83
+ def res = 0
84
+ while (true ) {
85
+ res = sql " select count(*) from ${ tableName} "
86
+ log. info(" res: ${ res} " )
87
+ def state = sql " show routine load for ${ job} "
88
+ log. info(" routine load state: ${ state[0][8].toString()} " . toString())
89
+ log. info(" reason of state changed: ${ state[0][17].toString()} " . toString())
90
+ if (state[0 ][17 ]. toString(). contains(" Failed to get info. No alive backends" )) {
91
+ break
92
+ }
93
+ if (count >= 60 ) {
94
+ log. error(" routine load test fail" )
95
+ assertEquals (1 , 2 )
96
+ break
97
+ }
98
+ sleep(1000 )
99
+ count++
100
+ }
101
+ } finally {
102
+ sql """ ALTER SYSTEM MODIFY BACKEND "${ beId} " SET ("disable_load" = "fasle"); """
103
+ sql " stop routine load for ${ job} "
104
+ }
105
+ }
106
+ }
0 commit comments