-
Notifications
You must be signed in to change notification settings - Fork 2
Kafka on kubernetes
선정이유 : 가장 많은 star 수와, 관련된 자료가 많음.
confluent 에서 kafka 관련한 솔루션과 ksql 를 개발하는등 많은 활동을 하고있음.
- 기본설치
kubectl create ns kafka
helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
helm install --name my-kafka --namespace kafka incubator/kafka
2-1 helm 설치가 안되어있다면 아래 스크립트 실행
curl https://raw.githubusercontent.com/kubernetes/helm/master/scripts/get | bash
kubectl --namespace kube-system create sa tiller
kubectl create clusterrolebinding tiller --clusterrole cluster-admin --serviceaccount=kube-system:tiller
helm init --service-account tiller
helm repo update
- 제거
# --purge 옵션으로 관련된 모든 정보를 지운다.
helm delete my-kafka --purge
- 버전확인
kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-topics --version
>> 2.0.1-cp1 (Commit:815feb8a888d39d9)
kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-topics --zookeeper my-kafka-zookeeper:2181 --list
kubectl -n kafka exec -ti my-kafka-0 -- /usr/bin/kafka-console-consumer --bootstrap-server my-kafka:9092 --topic delpoyMsgTopic --from-beginning
kubectl -n kafka exec -ti my-kafka-1 -- /usr/bin/kafka-console-consumer --bootstrap-server my-kafka:9092 --topic topic.consumer --from-beginning
kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-consumer-groups --bootstrap-server my-kafka:9092 --group foo --describe
kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-topics --zookeeper my-kafka-zookeeper:2181 --topic test1 --describe
kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-topics --zookeeper my-kafka-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1
kubectl -n kafka exec -ti my-kafka-0 -- /usr/bin/kafka-console-consumer --bootstrap-server my-kafka:9092 --topic test1 --from-beginning
To stop the listener session above press: Ctrl+C
kubectl -n kafka exec -ti my-kafka-0 -- /usr/bin/kafka-console-producer --broker-list my-kafka:9092 --topic test1
To create a message in the above session, simply type the message and press "enter"
To end the producer session try: Ctrl+C
topic 은 파티션 단위로 쪼개져서 클러스터에 분산 저장됨
- topic 메세지를 발생시간 순으로 처리하려면 1개의 파티션만 가지도록 설계를 해야한다.
- 파티션이 여러개일 경우 메세지는 분산되어 각 파티션에 저장이 된다.
- 파티션이 여러개일 경우 각 파티션별로 도착되는 메세지는 서로 다른 offset 을 가지고 있다.
(0번 파티션에는 11개의 메세지, 1번 파티션에는 14개의 메세지 이런식으로 올수있다. ) - 파티션의 메세지는 컨슈머 그룹별로 가져가게 된다.
( 3개의컨슈머 그룹이 있다면 3개의 컨슈머 그룹은 같은 내용을 받는다. ) - 파티션에는 컨슈머 그룹안의 오직 1개의 컨슈머만 접근이 가능하다.
( 2개의 파티션에 1개의 컨슈머가 있다면 1개의 컨슈머는 1번,2번 파티션의 메세지를 모두 받는다.)
( 2개의 파티션에 2개의 컨슈머가 있다면 1번 컨슈머는 1번 파티션 내용을 받고, 2번 컨슈머는 2번 파티션 내용을 받는다)
( 2개의 파티션에 3개의 컨슈머가 있다면 1번 컨슈머는 1번 파티션 내용을 받고, 2번 컨슈머는 2번 파티션 내용을 받는다, 3번 컨슈머는 아무 내용도 수신을 하지않는다. )
-->> 이렇게 파티션의 갯수보다 컨슈머의 수가 많으면 처리를 안하는 노드가 생기기때문제 적절하게 조정을 해야한다.
저장기간 : log.retention.hours : 168 (7일)
저장소 : /opt/kafka/data/logs
저장용량 : 1 GB
log.retention.hours: 저장기간
log.retention.bytes : The maximum size of the log before deleting it
log.segment.bytes : The maximum size of a single log file
# broker 별로 다른 로그를 확인 할수있다. my-kafka-0, my-kafka-1, my-kafka-2
kubectl -n kafka exec -ti my-kafka-0 /bin/bash
cd /opt/kafka/data/logs
# broker 별로 다른 로그를 확인 할수있다. my-kafka-0, my-kafka-1, my-kafka-2
kubectl -n kafka exec -ti my-kafka-0 /bin/bash
df -h
>> /dev/xvdbx 976M 2.9M 958M 1% /opt/kafka/data
incubator/kafka 로 kafka 설치시 PersistentVolume 을 생성하여서 해당 Volumn 에 로그를 저장하여 놓는다.
별도의 storage 이기때문에, 카프카를 삭제하여도 같은 이름으로 재설치를 하거나, 해당 volumn 을 붙여준다면
기존의 log 및 topic 정보를 그대로 가져 올 수 있다.
참고 1
deployment 와 statefulset 은 생성시
spec.volumeClaimTemplates 을 설정하여
--> PersistentVolume 을 생성할수있다.
(provide stable storage using PersistentVolumes provisioned by a PersistentVolume Provisioner)
참고 2
kubernetes 에서는 개발자가 infra 를 설정안하고 PersistentVolumeClaim 을 통하여 user 가 storage 를 요청 할수있도록 구성하였다.
https://kubernetes.io/docs/concepts/storage/persistent-volumes/
kubectl get pv
kubectl get pvc -n kafka
이전버전에서 작업시에는 kube-apiserver 에서 kubelet --feature-gates="ExpandPersistentVolumes=true" 라는 복잡한 과정을 거쳐야함
이 옵션이 설정되어있는 Storage 만 pvc 의 storage가 수정이가능하다
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: standard
parameters:
# aws 에서 type은 gp2(범용SSD), sc1(콜드HDD) 등이 있다.
type: gp2
# AWS EBS, GCE PD, Azure Disk ...
provisioner: kubernetes.io/aws-ebs
allowVolumeExpansion: true
reclaimPolicy: Delete
적용후 약간의 시간이 걸린다. ( 대략 3분 )
spec:
resources:
requests:
storage: 5Gi ---->> 10Gi 수정가능
- 카프카 broker size up
-
방법1 : 삭제 후 옵션을 추가하여 재 인스톨
--set replicas=4
-
방법2 : 바로 statefulSet edit
kubectl get statefulSet -n kafka
kubectl edit statefulSet my-kafka -n kafka
#### replicas 변경
#### 저장
- zookeeper node size up
-
방법1 : 삭제 후 옵션을 추가하여 재 인스톨
--set zookeeper.replicaCount=5
-
방법2 : 바로 statefulSet edit
( 분산코디네이터는 홀수단위로 설정 3->5 로 증설시 1초에 60,000개의 요청을 더 처리할수 있다고 함)
kubectl get statefulSet -n kafka
kubectl edit statefulSet my-kafka-zookeeper -n kafka
#### replicas 변경
#### 저장
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
kafkaTemplate.send(토픽명, String 보낼메세지 );
kafkaTemplate.send(토픽명, int 파티션 id , String 보낼메세지 );
--> 파티션 id 는 0부터 1씩 증가를 한다. 특정 파티션을 골라서 보내지 않는 이상 이와같은 방법은 추천안한다.
kafkaTemplate.send(토픽명, String key , String 보낼메세지 );
--> key 를 기준으로 데이터가 분배되어 파티션에 들어간다.
예를 들어 key 의 데이터가 "aa" , "ab" , "ba" , "bb", "ca" , "cb" 라고 가정한다면
3개의 파티션이 있을 경우 각 파티션은 두개의 key 에 해당하는 값을 나눠서 가져간다.
이렇게 쓰일때는
props.put(ProducerConfig.ACKS_CONFIG, "all");
같이 모든 메세지가 정상적으로 보내지고 난 후에 다른 작업을 할때 사용된다.
public void sendMessage(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("토픽명", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message=[" + message +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=["
+ message + "] due to : " + ex.getMessage());
}
});
}
@KafkaListener(topics = "${topic.cloudInstance}")
public void listen(@Payload String message, Consumer<?, ?> consumer, ConsumerRecord<?, ?> consumerRecord) {
// 데이터 베이스에 값을 넣을 경우에는 데이터를 저장 후에 수동 커밋을 해준다.
consumer.commitAsync();
LOG.info("listen record='{}' message='{}'", consumerRecord.offset() , message);
}
/**
* 특정 파티션의 데이터를 가져오는 방법
*/
@KafkaListener(topicPartitions
= @TopicPartition(topic = "${topic.cloudInstance}", partitions = { "${topic.cloudInstance.partition}" })
, groupId = "bar")
public void listenWithPartition(@Payload String message, ConsumerRecord<?, ?> consumerRecord) {
LOG.info("listenWithPartition record='{}' message='{}' partition='{}'", consumerRecord.offset() , message, consumerRecord.partition());
}
/**
* 특정 파티션을 받겠다고 선언을 안해 놓으면, consumer 수가 늘어날수록, 알아서 partition을 분배하여 간다.
*/
@KafkaListener(topics = "${topic.cloudInstance}")
public void listenWithOutPartition(@Payload String message, ConsumerRecord<?, ?> consumerRecord) {
LOG.info("listenWithPartition record='{}' message='{}' partition='{}'", consumerRecord.offset() , message, consumerRecord.partition());
}
/**
* 특정 클래스로 받으려면 아래와 같이 객체로 변환을 하면 된다.
* 주의점은 프로듀서쪽의 객체를 잘 마추어야 한다. - 별로 추천하지 않음
*/
@KafkaListener(topics = "${topic.cloudInstance}")
public void listenByObject(@Payload String message, ConsumerRecord<?, ?> consumerRecord) {
CloudInstance cloudInstance = new Gson().fromJson(message, CloudInstance.class);
LOG.info("accountId='{}' status='{}' " , cloudInstance.accountId , cloudInstance.status );
}
/**
* 특정 파티션의 특정 오프셋으로 부터 토픽을 시작하는 방법
*/
@KafkaListener(topicPartitions
= @TopicPartition(topic = "${topic.cloudInstance}",
partitionOffsets = {
@PartitionOffset(partition = "${topic.cloudInstance.partition}" , initialOffset = "110" )
} )
, groupId = "bar1")
public void listenWithSpecificOffset(@Payload String message, ConsumerRecord<?, ?> consumerRecord) {
LOG.info("listenWithSpecificOffset record='{}' message='{}' partition='{}'", consumerRecord.offset() , message, consumerRecord.partition());
}
/**
* 이 아래 부분은 메세지를 받을때 선언된 Factory를 지정하는데
* filterKafkaListenerContainerFactory 에는 특정 문구가 포함된 메세지는 거절하도록 구현되어있음
*/
@KafkaListener(
topics = "${topic.cloudInstance}",
containerFactory = "filterKafkaListenerContainerFactory")
public void listen2(String message, ConsumerRecord<?, ?> consumerRecord) {
LOG.info("value='{}' ", consumerRecord.value());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(
// 해당 메세지를 무시한다.
record -> record.value().contains("Crash"));
return factory;
}
/**
* 이 아래 부분은 10초단위로 데이터를 묶어서 메세지를 pull 한다.
* AUTO_OFFSET_RESET_CONFIG 내용이 없어서 시작할때마다 처음부터 모든 데이터를 가져온다.
*/
@Bean
public void consume(){
KafkaConsumer<String, String> consumer = createKafkaConsumer();
while (true) {
int second = 10;
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(second));
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
for(ConsumerRecord<String, String> record : partitionedRecords){
LOG.info("offset='{}' value='{}' ", record.offset(), record.value());
}
}
LOG.info("===================== {} 초 여백 ==================== " , second);
}
}
private KafkaConsumer<String, String> createKafkaConsumer() {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(GROUP_ID_CONFIG, "foo");
props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(singleton(cloudInstance));
return consumer;
}