diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 75510f52c374..ec4f694f61ec 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -77,7 +77,7 @@ local schema = { mechanism = { type = "string", default = "PLAIN", - enum = {"PLAIN"}, + enum = {"PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"}, }, user = { type = "string", description = "user" }, password = { type = "string", description = "password" }, diff --git a/ci/init-plugin-test-service.sh b/ci/init-plugin-test-service.sh index 2da891e567e6..7b60607f5851 100755 --- a/ci/init-plugin-test-service.sh +++ b/ci/init-plugin-test-service.sh @@ -20,7 +20,15 @@ after() { docker exec -i apache-apisix-kafka-server1-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test2 docker exec -i apache-apisix-kafka-server1-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 3 --topic test3 docker exec -i apache-apisix-kafka-server2-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server2:2181 --replication-factor 1 --partitions 1 --topic test4 - + docker exec -i apache-apisix-kafka-server3-scram-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server3:2181 --replication-factor 1 --partitions 1 --topic test-scram-256 + docker exec -i apache-apisix-kafka-server3-scram-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server3:2181 --replication-factor 1 --partitions 1 --topic test-scram-512 + # Create user with SCRAM-SHA-512 + docker exec apache-apisix-kafka-server3-scram-1 /opt/bitnami/kafka/bin/kafka-configs.sh \ + --zookeeper zookeeper-server3:2181 \ + --alter \ + --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' \ + --entity-type users \ + --entity-name admin # prepare openwhisk env docker pull openwhisk/action-nodejs-v14:1.20.0 docker run --rm -d --name openwhisk -p 3233:3233 -p 3232:3232 -v /var/run/docker.sock:/var/run/docker.sock openwhisk/standalone:1.0.0 diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index 22d0303f9700..2c026b76ab14 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -80,6 +80,16 @@ services: networks: kafka_net: + zookeeper-server3: + image: bitnamilegacy/zookeeper:3.6.0 + env_file: + - ci/pod/kafka/zookeeper-server/env/common.env + restart: unless-stopped + ports: + - "12182:12181" + networks: + kafka_net_2: + kafka-server1: image: bitnamilegacy/kafka:2.8.1 env_file: @@ -113,6 +123,25 @@ services: volumes: - ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro + kafka-server3-scram: + image: bitnamilegacy/kafka:2.8.1 + env_file: + - ci/pod/kafka/kafka-server/env/common3-scram.env + environment: + KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper-server3:2181 + restart: unless-stopped + ports: + - "29092:29092" # PLAINTEXT for inter-broker communication + - "29094:29094" # SASL_SCRAM for clients + depends_on: + - zookeeper-server1 + - zookeeper-server2 + - zookeeper-server3 + networks: + kafka_net_2: + volumes: + - ./ci/pod/kafka/kafka-server/kafka_scram_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro + ## SkyWalking skywalking: image: apache/skywalking-oap-server:8.7.0-es6 @@ -392,6 +421,7 @@ services: networks: apisix_net: kafka_net: + kafka_net_2: skywalk_net: rocketmq_net: opa_net: diff --git a/ci/pod/kafka/kafka-server/env/common3-scram.env b/ci/pod/kafka/kafka-server/env/common3-scram.env new file mode 100644 index 000000000000..945d544b6905 --- /dev/null +++ b/ci/pod/kafka/kafka-server/env/common3-scram.env @@ -0,0 +1,20 @@ +ALLOW_PLAINTEXT_LISTENER=yes +KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false + +# CORRECTED: Use SASL_PLAINTEXT protocol with SCRAM mechanism +KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:29092,SASL_PLAINTEXT://0.0.0.0:29094 +KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-server3-scram:29092,SASL_PLAINTEXT://127.0.0.1:29094 + +# SCRAM-specific configuration +KAFKA_CFG_SASL_ENABLED_MECHANISMS=SCRAM-SHA-256,SCRAM-SHA-512 +KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAINTEXT + +# Security protocol for inter-broker communication (since it's a single-node cluster) +KAFKA_CFG_SECURITY_INTER_BROKER_PROTOCOL=PLAINTEXT + +# Optional: Explicitly set the security protocol map +KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT + +# Other configurations +KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1 +KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 diff --git a/ci/pod/kafka/kafka-server/kafka_scram_jaas.conf b/ci/pod/kafka/kafka-server/kafka_scram_jaas.conf new file mode 100644 index 000000000000..01c832bd3659 --- /dev/null +++ b/ci/pod/kafka/kafka-server/kafka_scram_jaas.conf @@ -0,0 +1,26 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +KafkaServer { + org.apache.kafka.common.security.scram.ScramLoginModule required + username="admin" + password="admin-secret"; + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin-secret" + user_admin="admin-secret"; +}; diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index a1a717c5e95e..de8bddd39151 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -42,7 +42,7 @@ It might take some time to receive the log data. It will be automatically sent a | brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. | | brokers.port | integer | True | | [0, 65535] | The port of Kafka broker | | brokers.sasl_config | object | False | | | The sasl config of Kafka broker | -| brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config | +| brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"] | The mechaism of sasl config | | brokers.sasl_config.user | string | True | | | The user of sasl_config. If sasl_config exists, it's required. | | brokers.sasl_config.password | string | True | | | The password of sasl_config. If sasl_config exists, it's required. | | kafka_topic | string | True | | | Target topic to push the logs for organisation. | diff --git a/docs/zh/latest/plugins/error-log-logger.md b/docs/zh/latest/plugins/error-log-logger.md index d9d559031fca..508955bf325c 100644 --- a/docs/zh/latest/plugins/error-log-logger.md +++ b/docs/zh/latest/plugins/error-log-logger.md @@ -51,7 +51,7 @@ description: API 网关 Apache APISIX error-log-logger 插件用于将 APISIX | kafka.brokers.host | string | 是 | | | Kafka broker 的节点 host 配置,例如 `192.168.1.1`| | kafka.brokers.port | string | 是 | | | Kafka broker 的节点端口配置 | | kafka.brokers.sasl_config | object | 否 | | | Kafka broker 中的 sasl_config | -| kafka.brokers.sasl_config.mechanism | string | 否 | "PLAIN" | ["PLAIN"] | Kafka broker 中的 sasl 认证机制 | +| kafka.brokers.sasl_config.mechanism | string | 否 | "PLAIN" | ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"] | Kafka broker 中的 sasl 认证机制 | | kafka.brokers.sasl_config.user | string | 是 | | | Kafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写 | | kafka.brokers.sasl_config.password | string | 是 | | | Kafka broker 中 sasl 配置中的 password,如果 sasl_config 存在,则必须填写 | | kafka.kafka_topic | string | 是 | | | 需要推送的 Kafka topic。| diff --git a/t/plugin/kafka-logger4.t b/t/plugin/kafka-logger4.t new file mode 100644 index 000000000000..b4d3e456d860 --- /dev/null +++ b/t/plugin/kafka-logger4.t @@ -0,0 +1,275 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } +}); + +run_tests; + +__DATA__ + +=== TEST 1: set route with correct sasl_config - SCRAM-SHA-256 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins":{ + "kafka-logger":{ + "brokers":[ + { + "host":"127.0.0.1", + "port":29094, + "sasl_config":{ + "mechanism":"SCRAM-SHA-256", + "user":"admin", + "password":"admin-secret" + } + }], + "kafka_topic":"test-scram-256", + "producer_type":"async", + "key":"key1", + "timeout":1, + "batch_max_size":1, + "include_req_body": true + } + }, + "upstream":{ + "nodes":{ + "127.0.0.1:1980":1 + }, + "type":"roundrobin" + }, + "uri":"/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 2: hit route, send data to kafka successfully +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- error_log eval +qr/send data to kafka: \{.*"route_id":"1"/ +--- no_error_log +[error] +--- wait: 2 + + + +=== TEST 3: set route with incorrect password - SCRAM-SHA-256 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins":{ + "kafka-logger":{ + "brokers":[ + { + "host":"127.0.0.1", + "port":29094, + "sasl_config":{ + "mechanism":"SCRAM-SHA-256", + "user":"admin", + "password":"admin-secrets" + } + }], + "kafka_topic":"test-scram-256", + "producer_type":"async", + "key":"key1", + "timeout":1, + "batch_max_size":1, + "include_req_body": true + } + }, + "upstream":{ + "nodes":{ + "127.0.0.1:1980":1 + }, + "type":"roundrobin" + }, + "uri":"/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 4: hit route, send data to kafka unsuccessfully +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- error_log +Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256 +--- wait: 2 + + + +=== TEST 5: set route with correct sasl_config - SCRAM-SHA-512 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins":{ + "kafka-logger":{ + "brokers":[ + { + "host":"127.0.0.1", + "port":29094, + "sasl_config":{ + "mechanism":"SCRAM-SHA-512", + "user":"admin", + "password":"admin-secret" + } + }], + "kafka_topic":"test-scram-512", + "producer_type":"async", + "key":"key1", + "timeout":1, + "batch_max_size":1, + "include_req_body": true + } + }, + "upstream":{ + "nodes":{ + "127.0.0.1:1980":1 + }, + "type":"roundrobin" + }, + "uri":"/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 6: hit route, send data to kafka successfully +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- error_log eval +qr/send data to kafka: \{.*"route_id":"1"/ +--- no_error_log +[error] +--- wait: 2 + + + +=== TEST 7: set route with incorrect password - SCRAM-SHA-512 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins":{ + "kafka-logger":{ + "brokers":[ + { + "host":"127.0.0.1", + "port":29094, + "sasl_config":{ + "mechanism":"SCRAM-SHA-512", + "user":"admin", + "password":"admin-secrets" + } + }], + "kafka_topic":"test-scram-256", + "producer_type":"async", + "key":"key1", + "timeout":1, + "batch_max_size":1, + "include_req_body": true + } + }, + "upstream":{ + "nodes":{ + "127.0.0.1:1980":1 + }, + "type":"roundrobin" + }, + "uri":"/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 8: hit route, send data to kafka unsuccessfully +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- error_log +Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512 +--- wait: 2