-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Description
Kafka Connect
- 특정한 Code 없이 Configuration으로 Data를 Import/Export 가능
- Standalone/Distribution Mode 지원
- RESTful API 통해 지원
- Stream 또는 Batch 형태로 데이터 전송 가능
- 커스텀 Connector를 통한 다양한 Plugin 제공(File, S3, Hive, MySQL, etc)
- Kafka Connect Source : 특정 시스템에서 Kafak Cluster로 데이터를 가져오는 Layer
- Kafka Connect Sink : Kafka CLuster의 데이터를 특정 시스템으로 보내는 Layer
Kafka Connect 설치
- url -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
- tar xvf confluent-community-6.1.0.tar.gz
- cd $KAFKA_CONNECT_HOME
Kafka Connect 설정
- $KAFKA_HOME/config/connect-distributed.properties
Kafka Connect 실행
- ./bin/connect-distributed ./etc/kafka/connect-distributed.properties
Kafka Connect Topic 조회
- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Kafka Connect에서 JDBC Connector를 이용한 데이터 Export를 위한 Plugin
~etc/kafka/connect-distributed.properties 파일 마지막 아래 plugin 정보 추가
- plugin.path=~/confluentinc-kafka-connect-jdbc-10.0.1/lib
- JdbcSourceConnector에서 RDB 사용을 위한 드라이버 복사
- ./shre/java/kafka/ 폴더에 mariadb-java-client-2.7.2.jar 파일 복사
docker-compose.yml
주문 생성을 위한 Kafka Sink Connector 추가
POST 127.0.0.1:8083/connectors
{
"name": "order-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://127.0.0.1:3306/order",
"connection.user": "ecommerce",
"connection.password": "password",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topics": "orders"
}
}
Kafka Connect REST API
- GET /connectors : 커넥터 목록 조회
- GET /connectors/{name} : {name}을 갖는 커넥터 상세 정보 조회
- POST /connectors : 커넥터 생성 (Body = JSON Object 타입의 커넥터 config정보)
- GET /connectors/{name}/status : 이 커넥터가 running인지, failed인지 paused 인지 현재 상태 조회
- DELETE /connectors/{name} : {name}을 갖는 커넥터 삭제
- GET /connector-plugins : 카프카 커넥터 클러스터 내부에 설치된 플러그인 목록 조회
Reference URL
Metadata
Metadata
Assignees
Labels
No labels