이번 글에서는 kubernetes에 배포되어 있는 kafka cluster에 kakfa consumer 와 kafka producer를 연동하여 메시지를 송/수신하는 방법을 설명합니다.
이 글에서는 kubernetes 의 약어로 k8s로 명칭을 사용합니다.
kafka consumer와 producer는 kafka에서 default로 제공하는 kafka-console-consumer 와 kafka-console-producer를 이용할 것입니다.
많은 블로그에 올라와 있는 글에 kafka-console-consumer/producer 사용하는 방법과 k8s 환경에서는 어떻게 연동하는지를 확인합니다.
kafka cluster에 연동하기 위해 Kubernetes 기반 Kafka cluster 환경구성 글에서 배포한 kafka-client를 사용합니다.

1. kafka topics 목록 확인
k8s 에서 kafka-client pod 연계하여 kafka cli 명령어로 topic 목록을 조회합니다.

kubectl exec -it kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --list

kafka를 배포 후 아래의 명령어를 수행하면 생성된 topic 이 없기 때문에 목록 조회 결과가 없습니다.

2. kafka topics 생성
kafka cluster에 테스트용으로 topic 명칭을 “test01” 으로 kafka topics을 생성합니다.

kubectl exec -it kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --create --topic test01 --partitions 1 --replication-factor 3

[수행 결과]
Created topic “test01”.

topic 목록 다시 조회를 수행합니다.
kubectl exec -it kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --list

[수행 결과]
test01

3. kafka-console-consumer 로 메시지 수신 대기.
kafka-console-consumer를 수행합니다.

kubectl exec -it kafka-client -- kafka-console-consumer --bootstrap-server kafka:9092 --topic test01

4. kafka-console-producer 로 메시지 송신
kafka-console-producer를 실행합니다.
kubectl exec -it kafka-client -- kafka-console-producer --broker-list kafka:9092 --topic test01

kafka-console-producer 창에서 메시시를 송신하기 위해 “test” 입력하고 엔터

kafka-console-consumer 화면