Skip to content

Kafka on kubernetes

kimhyungkook edited this page Feb 25, 2019 · 7 revisions

1. 카프카 설치

설치 및 제거

  1. helm 선정
    https://github.com/helm/charts/tree/master/incubator/kafka

선정이유 : 가장 많은 star 수와, 관련된 자료가 많음.
confluent 에서 kafka 관련한 솔루션과 ksql 를 개발하는등 많은 활동을 하고있음.

  1. 기본설치
kubectl create ns kafka
helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
helm install --name my-kafka --namespace kafka incubator/kafka

2-1 helm 설치가 안되어있다면 아래 스크립트 실행

curl https://raw.githubusercontent.com/kubernetes/helm/master/scripts/get | bash
kubectl --namespace kube-system create sa tiller
kubectl create clusterrolebinding tiller --clusterrole cluster-admin --serviceaccount=kube-system:tiller
helm init --service-account tiller
helm repo update
  1. 제거
# --purge 옵션으로 관련된 모든 정보를 지운다. 
helm delete my-kafka --purge
  1. 버전확인
kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-topics --version

>> 2.0.1-cp1 (Commit:815feb8a888d39d9)

설치 후 기본 조회 작업

# 토픽 리스트 보기

kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-topics --zookeeper my-kafka-zookeeper:2181 --list

# 토픽 consumer console

kubectl -n kafka exec -ti my-kafka-0 -- /usr/bin/kafka-console-consumer --bootstrap-server my-kafka:9092 --topic delpoyMsgTopic --from-beginning

kubectl -n kafka exec -ti my-kafka-1 -- /usr/bin/kafka-console-consumer --bootstrap-server my-kafka:9092 --topic topic.consumer --from-beginning

# 컨슈머와 오프셋 확인

kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-consumer-groups --bootstrap-server my-kafka:9092 --group foo --describe

kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-topics --zookeeper my-kafka-zookeeper:2181 --topic test1 --describe

# 토픽생성

kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-topics --zookeeper my-kafka-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1

# To listen for messages on a topic:

kubectl -n kafka exec -ti my-kafka-0 -- /usr/bin/kafka-console-consumer --bootstrap-server my-kafka:9092 --topic test1 --from-beginning

To stop the listener session above press: Ctrl+C

# To start an interactive message producer session:

kubectl -n kafka exec -ti my-kafka-0 -- /usr/bin/kafka-console-producer --broker-list my-kafka:9092 --topic test1

To create a message in the above session, simply type the message and press "enter"
To end the producer session try: Ctrl+C


2. 운영시 고려사항

파티션과 컨슈머 간의 구성

topic 은 파티션 단위로 쪼개져서 클러스터에 분산 저장됨

  • topic 메세지를 발생시간 순으로 처리하려면 1개의 파티션만 가지도록 설계를 해야한다.
  • 파티션이 여러개일 경우 메세지는 분산되어 각 파티션에 저장이 된다.
  • 파티션이 여러개일 경우 각 파티션별로 도착되는 메세지는 서로 다른 offset 을 가지고 있다.
    (0번 파티션에는 11개의 메세지, 1번 파티션에는 14개의 메세지 이런식으로 올수있다. )
  • 파티션의 메세지는 컨슈머 그룹별로 가져가게 된다.
    ( 3개의컨슈머 그룹이 있다면 3개의 컨슈머 그룹은 같은 내용을 받는다. )
  • 파티션에는 컨슈머 그룹안의 오직 1개의 컨슈머만 접근이 가능하다.
    ( 2개의 파티션에 1개의 컨슈머가 있다면 1개의 컨슈머는 1번,2번 파티션의 메세지를 모두 받는다.)
    ( 2개의 파티션에 2개의 컨슈머가 있다면 1번 컨슈머는 1번 파티션 내용을 받고, 2번 컨슈머는 2번 파티션 내용을 받는다)
    ( 2개의 파티션에 3개의 컨슈머가 있다면 1번 컨슈머는 1번 파티션 내용을 받고, 2번 컨슈머는 2번 파티션 내용을 받는다, 3번 컨슈머는 아무 내용도 수신을 하지않는다. )
    -->> 이렇게 파티션의 갯수보다 컨슈머의 수가 많으면 처리를 안하는 노드가 생기기때문제 적절하게 조정을 해야한다.

로그 설정

# 로그는 기본으로 설치시 아래와 같이 구성된다.

저장기간 : log.retention.hours : 168 (7일)
저장소 : /opt/kafka/data/logs
저장용량 : 1 GB

log.retention.hours: 저장기간
log.retention.bytes : The maximum size of the log before deleting it
log.segment.bytes : The maximum size of a single log file

# 로그확인

# broker 별로 다른 로그를 확인 할수있다. my-kafka-0, my-kafka-1, my-kafka-2
kubectl -n kafka exec -ti my-kafka-0 /bin/bash
cd /opt/kafka/data/logs

# 로그 저장 용량 확인

# broker 별로 다른 로그를 확인 할수있다. my-kafka-0, my-kafka-1, my-kafka-2
kubectl -n kafka exec -ti my-kafka-0 /bin/bash
df -h
>> /dev/xvdbx      976M  2.9M  958M   1% /opt/kafka/data

incubator/kafka 로 kafka 설치시 PersistentVolume 을 생성하여서 해당 Volumn 에 로그를 저장하여 놓는다.
별도의 storage 이기때문에, 카프카를 삭제하여도 같은 이름으로 재설치를 하거나, 해당 volumn 을 붙여준다면
기존의 log 및 topic 정보를 그대로 가져 올 수 있다.

참고 1
deployment 와 statefulset 은 생성시
spec.volumeClaimTemplates 을 설정하여
--> PersistentVolume 을 생성할수있다.
(provide stable storage using PersistentVolumes provisioned by a PersistentVolume Provisioner)

참고 2
kubernetes 에서는 개발자가 infra 를 설정안하고 PersistentVolumeClaim 을 통하여 user 가 storage 를 요청 할수있도록 구성하였다.
https://kubernetes.io/docs/concepts/storage/persistent-volumes/

# PersistentVolume 및 PersistentVolumeClaim 확인

kubectl get pv
kubectl get pvc -n kafka

로그 저장 공간 확장 (pv upgrade)

# 아래 방법은 kubernetes 1.11 버전부터 가능한 방법이다.

이전버전에서 작업시에는 kube-apiserver 에서 kubelet --feature-gates="ExpandPersistentVolumes=true" 라는 복잡한 과정을 거쳐야함

1.StorageClass를 생성해서 allowVolumeExpansion: true 를 준다

이 옵션이 설정되어있는 Storage 만 pvc 의 storage가 수정이가능하다

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: standard
parameters:
  # aws 에서 type은 gp2(범용SSD), sc1(콜드HDD) 등이 있다.
  type: gp2  
# AWS EBS, GCE PD, Azure Disk ...
provisioner: kubernetes.io/aws-ebs
allowVolumeExpansion: true
reclaimPolicy: Delete

2. 사이즈를 늘리려는 pv 와 pvc 를 조회하여 수정하여 준다.

적용후 약간의 시간이 걸린다. ( 대략 3분 )

spec:
  resources:
    requests:
      storage: 5Gi ---->> 10Gi 수정가능

3. kubectl get pv 를 하여 변경된 내용을 확인한다.

4. po 안쪽으로 들어가서 df -h 명령어를 실행하여 log 폴더가 늘어났는지를 확인한다.

5. pod 가 이미 연결이 되어있어서 기존값인 5Gi 로 데이터가 보인다.

6. kubectl get pvc -n kafka 을 하였을때도 아직 5Gi 로 보인다.

7. pod 재시작 or 카프카를 삭제하는 순간 변경된 값으로 pvc 가 셋팅된게 보인다.

scale up / down

  1. 카프카 broker size up
  • 방법1 : 삭제 후 옵션을 추가하여 재 인스톨
    --set replicas=4

  • 방법2 : 바로 statefulSet edit

kubectl get statefulSet -n kafka
kubectl edit statefulSet my-kafka -n kafka

#### replicas 변경
#### 저장
  1. zookeeper node size up
  • 방법1 : 삭제 후 옵션을 추가하여 재 인스톨
    --set zookeeper.replicaCount=5

  • 방법2 : 바로 statefulSet edit
    ( 분산코디네이터는 홀수단위로 설정 3->5 로 증설시 1초에 60,000개의 요청을 더 처리할수 있다고 함)

kubectl get statefulSet -n kafka
kubectl edit statefulSet my-kafka-zookeeper -n kafka

#### replicas 변경
#### 저장

3. sample code

full code github

springboot 2 기준 / spring-kafka

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

producer

1. 메세지를 보낼때는 아래와 같이 보내면 된다.

kafkaTemplate.send(토픽명, String 보낼메세지 );

kafkaTemplate.send(토픽명, int 파티션 id , String 보낼메세지 );
--> 파티션 id 는 0부터 1씩 증가를 한다. 특정 파티션을 골라서 보내지 않는 이상 이와같은 방법은 추천안한다.

kafkaTemplate.send(토픽명, String key , String 보낼메세지 );
--> key 를 기준으로 데이터가 분배되어 파티션에 들어간다.
예를 들어 key 의 데이터가 "aa" , "ab" , "ba" , "bb", "ca" , "cb" 라고 가정한다면
3개의 파티션이 있을 경우 각 파티션은 두개의 key 에 해당하는 값을 나눠서 가져간다.

2. 다음 방법은 메세지를 보낸 후 callback 메세지를 받는 방법이다.

이렇게 쓰일때는
props.put(ProducerConfig.ACKS_CONFIG, "all");
같이 모든 메세지가 정상적으로 보내지고 난 후에 다른 작업을 할때 사용된다.

    public void sendMessage(String message) {

        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("토픽명", message);

        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("Sent message=[" + message +
                        "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("Unable to send message=["
                        + message + "] due to : " + ex.getMessage());
            }
        });
    }

consumer

1. 메세지를 받는 가장 간단한 방법 @KafkaListener(topics = "토픽명")

    @KafkaListener(topics = "${topic.cloudInstance}")
    public void listen(@Payload String message, Consumer<?, ?> consumer, ConsumerRecord<?, ?> consumerRecord) {
        // 데이터 베이스에 값을 넣을 경우에는 데이터를 저장 후에 수동 커밋을 해준다.
        consumer.commitAsync();
        LOG.info("listen record='{}' message='{}'", consumerRecord.offset() , message);
    }


    /**
     * 특정 파티션의 데이터를 가져오는 방법
     */
    @KafkaListener(topicPartitions
            = @TopicPartition(topic = "${topic.cloudInstance}", partitions = { "${topic.cloudInstance.partition}" })
            , groupId = "bar")
    public void listenWithPartition(@Payload String message, ConsumerRecord<?, ?> consumerRecord) {
        LOG.info("listenWithPartition record='{}' message='{}' partition='{}'", consumerRecord.offset() , message, consumerRecord.partition());
    }

    /**
     * 특정 파티션을 받겠다고 선언을 안해 놓으면, consumer 수가 늘어날수록, 알아서 partition을 분배하여 간다.
     */
    @KafkaListener(topics = "${topic.cloudInstance}")
    public void listenWithOutPartition(@Payload String message, ConsumerRecord<?, ?> consumerRecord) {
        LOG.info("listenWithPartition record='{}' message='{}' partition='{}'", consumerRecord.offset() , message, consumerRecord.partition());
    }

    /**
     * 특정 클래스로 받으려면 아래와 같이 객체로 변환을 하면 된다.
     * 주의점은 프로듀서쪽의 객체를 잘 마추어야 한다. - 별로 추천하지 않음
     */
    @KafkaListener(topics = "${topic.cloudInstance}")
    public void listenByObject(@Payload String message, ConsumerRecord<?, ?> consumerRecord) {
        CloudInstance cloudInstance = new Gson().fromJson(message, CloudInstance.class);
        LOG.info("accountId='{}'  status='{}' " , cloudInstance.accountId , cloudInstance.status );
    }


    /**
     * 특정 파티션의 특정 오프셋으로 부터 토픽을 시작하는 방법
     */
    @KafkaListener(topicPartitions
            = @TopicPartition(topic = "${topic.cloudInstance}",
            partitionOffsets = {
                    @PartitionOffset(partition = "${topic.cloudInstance.partition}" , initialOffset = "110" )
            }  )
            , groupId = "bar1")
    public void listenWithSpecificOffset(@Payload String message, ConsumerRecord<?, ?> consumerRecord) {
        LOG.info("listenWithSpecificOffset record='{}' message='{}' partition='{}'", consumerRecord.offset() , message, consumerRecord.partition());
    }

    /**
     * 이 아래 부분은 메세지를 받을때 선언된 Factory를 지정하는데
     * filterKafkaListenerContainerFactory 에는 특정 문구가 포함된 메세지는 거절하도록 구현되어있음
     */
    @KafkaListener(
            topics = "${topic.cloudInstance}",
            containerFactory = "filterKafkaListenerContainerFactory")
    public void listen2(String message, ConsumerRecord<?, ?> consumerRecord) {
        LOG.info("value='{}' ", consumerRecord.value());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setRecordFilterStrategy(
                // 해당 메세지를 무시한다.
                record -> record.value().contains("Crash"));
        return factory;
    }

2. 데이터를 모아서 받는 방법

     /**
     * 이 아래 부분은 10초단위로 데이터를 묶어서 메세지를 pull 한다.
     * AUTO_OFFSET_RESET_CONFIG 내용이 없어서 시작할때마다 처음부터 모든 데이터를 가져온다.
     */

    @Bean
    public void consume(){
        KafkaConsumer<String, String> consumer = createKafkaConsumer();

        while (true) {
            int second = 10;
            ConsumerRecords<String, String> records = consumer.poll(ofSeconds(second));

            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();

            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);

                for(ConsumerRecord<String, String> record : partitionedRecords){
                    LOG.info("offset='{}' value='{}' ", record.offset(), record.value());
                }
            }

            LOG.info("===================== {} 초 여백 ==================== " , second);
        }
    }
    private  KafkaConsumer<String, String> createKafkaConsumer() {
        Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(GROUP_ID_CONFIG, "foo");
        props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(singleton(cloudInstance));
        return consumer;
    }
Clone this wiki locally