이번 글에서는 kubernetes에 배포되어 있는 kafka cluster에 kakfa consumer 와 kafka producer를 연동하여 메시지를 송/수신하는 방법을 설명합니다.
kafka consumer와 producer는 kafka에서 default로 제공하는 kafka-console-consumer 와 kafka-console-producer를 이용할 것입니다. 많은 블로그에 올라와 있는 글에 kafka-console-consumer/producer 사용하는 방법을 확인하고, Kubernetes 환경에서는 어떻게 연동하는지를 알아보겠습니다.
kafka cluster에 연동하기 위해 Kubernetes 기반 kafka 클러스터 구성 (1/4) 글에서 배포한 kafka-client를 사용합니다.
1. kafka topics 목록 확인.
k8s 에서 kafka-client pod 연계하여 kafka cli 명령어로 topic 목록을 조회합니다.
kubectl exec -it kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --list
kafka를 배포 후 아래의 명령어를 수행하면 생성된 topic 이 없기 때문에 목록 조회 결과가 없습니다.
2. kafka topics 생성.
kafka cluster에 테스트용으로 topic 명칭을 “test01” 으로 kafka topics을 생성합니다.
kubectl exec -it kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --create --topic test01 --partitions 1 --replication-factor 3
[수행 결과]
Created topic “test01”.
topic 목록 다시 조회를 수행합니다.
kubectl exec -it kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --list</code>
[수행 결과].
test01
3. kafka-console-consumer 로 메시지 수신 대기
kafka-console-consumer를 수행합니다.
kubectl exec -it kafka-client -- kafka-console-consumer --bootstrap-server kafka:9092 --topic test01
4. kafka-console-producer 로 메시지 송신
kafka-console-producer를 실행합니다.
kubectl exec -it kafka-client -- kafka-console-producer --broker-list kafka:9092 --topic test01
kafka-console-producer 창에서 메시시를 송신하기 위해 “test” 입력
[수행 결과]
kafka-console-consumer 창에서 메시시를 수신 결과
[수행 결과]