이번 글에서는 k8s(Kubernetes)에 kafka connector를 배포하고, source 데이터베이스에 postgresql를 배포하고, target 데이터베이스에 source 데이터베이스의 변경 내용이 목적지 데이터베이스의 postgresql 데이터베이스의 데이터를 실시간 전송하는 위한 환경구성하는 방법을 올립니다.

kafka cluster 구성와 kafka-client pod를 배포하는 방법은 Kubernetes 기반 Kafka cluster 글을 참조하기 바랍니다.
이글에서 나오는 소스는 여기를 참조하세요.

k8s에 postgresql 데이터베이스를 배포하기 위해 아래의 k8s 의 object들을 source/target 데이터베이스용으로 만들것 입니다.

  • PVC(Persistent Volume Claim) : postgresql 데이터를 저장할 dynamic pv(Persistent Volume) 를 요청할 PVC을 생성
  • configMap : postgresql 사용자 정보/비밀번호/데이터베이스 환경정보를 저장할 configMap 생성
  • Postgresql Pod : postgresql 데이터베이스 container를 배포하기 위한 yaml 작성 및 pod 생성
  • Postgresql Service : postgresql 데이터베이스 pod에 접속을 위한 k8s의 postgresql service 생성

Source 데이터베이스를 위한 구성을 아래와 같이 수행합니다.

1. source postgresql 데이터를 저장할 dynamic pv(Persistent Volume) 를 요청할 PVC yaml 파일 작성
postgresql 데이터를 저장할 volume storage claim 파일 작성
아래 storageClassName 부분은 자신의 k8s 환경에 설치되어 있는 storageClass를 조회해서 자신의 storageClass 명으로 입력합니다.
[postgres-pvc]

2. postgresql 사용자 정보/비밀번호/데이터베이스 환경정보를 저장할 ConfigMap 생성
postgres 데이터베이스에 접속 정보 환경정보를 저장할 ConfigMap을 생성합니다.

3. postgresql 데이터베이스 container를 배포하기 위한 yaml 작성 및 pod 생성
postgresql 데이터베이스 container를 배포하기 위한 yaml 작성 및 pod 생성합니다.
postgresql yaml 파일에는 pod에서 사용하는 storage volume을 사용하기 위해 pvc를 volume mount 해야하며,
데이터베이스의 사용자와 비밀번호 및 데이터베이스 명을 이전에 생성한 ConfigMap에서 연결을 해줘야 합니다.
postgresql 의 default port 는 5432로 설정합니다.

4. postgresql 데이터베이스 pod에 접속을 위한 k8s의 postgresql service 생성
외부 어플리케이션에서 접속하기 위한 접근 정보 명칭으로 service name을 정의합니다.

target 데이터베이스를 위한 구성을 아래와 같이 수행합니다.

1. source postgresql 데이터를 저장할 dynamic pv(Persistent Volume) 를 요청할 PVC yaml 파일 작성
postgresql 데이터를 저장할 volume storage claim 파일 작성
아래 storageClassName 부분은 자신의 k8s 환경에 설치되어 있는 storageClass를 조회해서 자신의 storageClass 명으로 입력합니다.
[postgres-pvc]

2. postgresql 사용자 정보/비밀번호/데이터베이스 환경정보를 저장할 ConfigMap 생성
postgres 데이터베이스에 접속 정보 환경정보를 저장할 ConfigMap을 생성합니다.

3. postgresql 데이터베이스 container를 배포하기 위한 yaml 작성 및 pod 생성
postgresql 데이터베이스 container를 배포하기 위한 yaml 작성 및 pod 생성합니다.
postgresql yaml 파일에는 pod에서 사용하는 storage volume을 사용하기 위해 pvc를 volume mount 해야하며,
데이터베이스의 사용자와 비밀번호 및 데이터베이스 명을 이전에 생성한 ConfigMap에서 연결을 해줘야 합니다.
postgresql 의 default port 는 5432로 설정합니다.

4. postgresql 데이터베이스 pod에 접속을 위한 k8s의 postgresql service 생성
외부 어플리케이션에서 접속하기 위한 접근 정보 명칭으로 service name을 정의합니다.

kafka postgresql Source Connector 설정

Kafka-connector는 default로 postgres source jdbc driver가 설치되어 있어서 추가 driver없이 환경 구성이 가능합니다.
그 이외 데이터베이스 driver들은 사용자가 직접 설치를 해주어야 합니다.
k8s에 설치된 kafka-connector service 명을 확인하고 uri 부분에 service 명과 port를 설정합니다.

kubectl get svc -l app=cp-kafka-connect

kubectl exec -it kafka-client -- \
curl -X POST  http://my-connect-cp-kafka-connect:8083/connectors -H "Content-Type: application/json" -d '
 {
  "name": "jdbc_source_postgres_01",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "incrementing.column.name": "id",
    "tasks.max": "10",
    "mode": "incrementing",
    "topic.prefix": "postgres_source_",
    "connection.user": "postgres",
    "connection.password": "admin1234!",
    "poll.interval.ms": "3600",
    "table.poll.interval.ms": "5000",
    "numeric.mapping": "best_fit",
    "connection.url": "jdbc:postgresql://postgres:5432/postgres"
    }
 }'

kafka postgresql Sink Connector 설정

kubectl exec -it kafka-client -- \
curl -X POST  http://my-connect-cp-kafka-connect:8083/connectors -H "Content-Type: application/json" -d '
{
  "name": "jdbc_sink_postgres_01",
  "config": {
    "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://postgres-target:5432/postgres",
    "connection.user": "postgres",
    "connection.password": "admin1234!",
    "poll.interval.ms": "3600",
    "table.poll.interval.ms": "5000",
    "tasks.max":"3",
    "topics.regex": "^postgres_.*",
    "auto.create": "true",
    "auto.evolve": "true",
    "schemas.enable" : "true",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true
}
'

이전글 관련 참조

Kubernetes 기반 Redis – master/slave 구성
Kubernetes 기반 Redis – redis-client 구성
Kubernetes 기반 Kafka cluster – 환경 구성