1
+ import base64
1
2
import datetime
2
- import os
3
3
import random
4
4
import time
5
+ import uuid
5
6
from typing import Tuple
6
7
7
8
from testcontainers .core .container import DockerContainer
8
9
9
10
10
11
class ContainerHelper :
11
12
@staticmethod
12
- def create_kafka_container () -> Tuple [DockerContainer , str , int , int ]:
13
+ def create_kafka_container () -> Tuple [DockerContainer , str , int ]:
13
14
"""
14
- Returns (kafka container, broker list, kafka port, zookeper port ) tuple
15
+ Returns (kafka container, broker list, kafka port) tuple
15
16
"""
16
17
kafka_address = "127.0.0.1"
17
- zookeeper_port = random .randint (12000 , 15000 )
18
18
kafka_port = random .randint (16000 , 20000 )
19
- broker_list = "{0}:{1}" .format (kafka_address , kafka_port )
19
+ broker_list = f"{ kafka_address } :{ kafka_port } "
20
+ docker_hostname = uuid .uuid4 ().hex
21
+ docker_image_name = "confluentinc/cp-kafka:7.6.1"
22
+ kraft_cluster_id = base64 .urlsafe_b64encode (uuid .uuid4 ().bytes ).decode ()
20
23
21
- archname = os .uname ().machine
22
- if archname == "arm64" :
23
- kafka_container = DockerContainer (image = "dougdonohoe/fast-data-dev:latest" )
24
- else :
25
- kafka_container = DockerContainer (image = "lensesio/fast-data-dev:3.3.1" )
26
- kafka_container = kafka_container .with_env ("BROKER_PORT" , kafka_port )
27
- kafka_container = kafka_container .with_bind_ports (kafka_port , kafka_port )
28
- kafka_container = kafka_container .with_env ("ZK_PORT" , zookeeper_port )
29
- kafka_container = kafka_container .with_bind_ports (
30
- zookeeper_port , zookeeper_port
24
+ kafka_container = (
25
+ DockerContainer (image = docker_image_name , hostname = docker_hostname )
26
+ .with_env ("KAFKA_NODE_ID" , "0" )
27
+ .with_env ("KAFKA_PROCESS_ROLES" , "controller,broker" )
28
+ .with_env (
29
+ "KAFKA_LISTENERS" ,
30
+ f"PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:{ kafka_port } " ,
31
+ )
32
+ .with_env (
33
+ "KAFKA_ADVERTISED_LISTENERS" ,
34
+ f"PLAINTEXT://localhost:9092,EXTERNAL://{ broker_list } " ,
35
+ )
36
+ .with_env (
37
+ "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" ,
38
+ "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT" ,
39
+ )
40
+ .with_env ("KAFKA_CONTROLLER_QUORUM_VOTERS" , f"0@{ docker_hostname } :9093" )
41
+ .with_env ("KAFKA_CONTROLLER_LISTENER_NAMES" , "CONTROLLER" )
42
+ .with_env ("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR" , "1" )
43
+ .with_env ("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS" , "10" )
44
+ .with_env ("CLUSTER_ID" , kraft_cluster_id )
45
+ .with_bind_ports (kafka_port , kafka_port )
31
46
)
32
- kafka_container = kafka_container .with_env ("ADV_HOST" , kafka_address )
33
- # disable rest
34
- kafka_container = kafka_container .with_env ("REST_PORT" , 0 )
35
- kafka_container = kafka_container .with_env ("WEB_PORT" , 0 )
36
- kafka_container = kafka_container .with_env ("CONNECT_PORT" , 0 )
37
- kafka_container = kafka_container .with_env ("REGISTRY_PORT" , 0 )
38
- kafka_container = kafka_container .with_env ("RUNTESTS" , 0 )
39
- kafka_container = kafka_container .with_env ("SAMPLEDATA" , 0 )
40
- kafka_container = kafka_container .with_env ("FORWARDLOGS" , 0 )
41
- kafka_container = kafka_container .with_env ("SUPERVISORWEB" , 0 )
42
-
43
- return (kafka_container , broker_list , kafka_port , zookeeper_port )
47
+ return kafka_container , broker_list , kafka_port
44
48
45
49
@staticmethod
46
50
def start_kafka_container (kafka_container : DockerContainer ) -> None :
@@ -50,8 +54,8 @@ def start_kafka_container(kafka_container: DockerContainer) -> None:
50
54
while cut_off > datetime .datetime .utcnow ():
51
55
time .sleep (0.5 )
52
56
logs = kafka_container .get_logs ()
53
- if "success: broker entered RUNNING state" in logs [ 0 ]. decode (
54
- "utf-8"
55
- ) and "success: zookeeper entered RUNNING state " in logs [ 0 ]. decode ( "utf-8" ) :
56
- return
57
- raise Exception ("Failed to start container" )
57
+ for line in logs :
58
+ line = line . decode ()
59
+ if "Kafka Server started " in line :
60
+ return
61
+ raise TimeoutError ("Failed to start container" )
0 commit comments