SW Engineering/DevOps, SRE

[Kafka] Kafka와 Debezium을 활용하여 SQL Server의 데이터를 실시간으로 PostgreSQL로 복제하기

SungWookKang 2024. 2. 26. 10:57
반응형

[Kafka] Kafka Debezium 활용하여 SQL Server 데이터를 실시간으로 PostgreSQL 복제하기

 

l  Kafka 3.6.2, Debezium 2.5, SQL Server 2019, PostgreSQL 12

 

Kafka(이하 카프카”) 카프카 커넥터인 Debezium 활용하여 SQL Server에서 발생하는 실시간 DML 캡처하여 PostgreSQL 데이터를 복제하는 방법에 대해서 알아본다. 이번 포스트에서는 실시간 데이터 복제를 위한 구성 정도로만 다루고, 단계에서의 상세한 기술 내용은 추후 다른 포스팅에서 다룰 예정이다.

 

[Architecture]

이번에 구축하려는 시스템의 아키텍처는 아래와 같다. 어플리케이션에서 MS SQL Server 데이터를 변경하면 SQL Server CDC 기능을 사용하여 변경 사항을 캡처한다. Debezium 변경 사항을 확인하여 Kafka 데이터를 입력한다. 그리고 카프카에서 PostgreSQL 실시간 데이터를 복제한다.

 

시스템 버전
Kafka 3.6.2
Debezium 2.5.1
SQL Server 2019
PostgreSQL 12

 

[카프카 설치]

카프카 설치는 아래 링크를 참고한다.

l  [Kafka] Kafka 클러스터 4노드 구성 - Controller, Broker 혼합해서 구성하기 : https://sungwookkang.com/entry/Kafka-Kafka-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0-4%EB%85%B8%EB%93%9C-%EA%B5%AC%EC%84%B1-Controller-Broker-%ED%98%BC%ED%95%A9%ED%95%B4%EC%84%9C-%EA%B5%AC%EC%84%B1%ED%95%98%EA%B8%B0

 

 

[Debezium 커넥터 설치]

카프카에서 MS SQL ServerPostgreSQL 연결하기 위한 Debezium 커넥터를 다운로드하여 설치한다.

l  Debezium : https://debezium.io/releases/2.5/#installation

 

 

다운로드 압축을 해제한다. 필자의 경우 카프카 디렉터리에 plugins라는 디렉토리를 생성하여  파일을 위치하였다. 사용자에 따라 파일 위치를 다르게 가능하며, 이후 카프카 커넥터 설정에서 해당 위치의 경로를 지정하여 사용하기 때문에 경로가 달라도 상관없다.

#SQL Server Connector
tar xvf debezium-connector-sqlserver-2.5.0.Final-plugin.tar.gz -C /usr/local/kafka/plugins/
#PostgreSQL Connector
tar -xvzf debezium-connector-jdbc-2.2.1.Final-plugin.tar.gz -C /usr/local/kafka/plugins/

 

 

[카프카에서 커넥터 설정 서비스 시작]

카프카에서 커넥터를 사용하기 위해 커넥터의 경로를 설정하고, 서비스를 시작한다. 카프카 디렉토리에서 connect-distributed.properties에서 아래 항목을 수정한다.

cd /usr/local/kafka
 
vi ./config/connect-distributed.properties

 

#Broker Node IP
bootstrap.servers=XXX.XXXX.XXX.XXX:9092
 
#Debezium path
plugin.path=/usr/local/kafka/plugins

 

커넥터 서비스를 시작한다.

cd /usr/local/kafka
 
./bin/connect-distributed.sh -daemon ./config/connect-distributed.properties

 

커넥터가 정상적으로 실행되면 8083포트를 통해서 정보를 확인할 있다.

curl localhost:8083/connector-plugins | jq

 

 

 

[SQL Server CDC 활성화]

SQL Server에서 데이터의 변경을 실시간으로 캡처하기 위해서는 CDC 기능을 활성화한다. SQL Server CDC 대한 자세한 내용은 아래 링크를 참고한다.

l  Enable and disable change data capture : https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-ver16

 

 

실습을 위한 데이터베이스 테이블을 생성한다. 테이블에는 반드시 Primary key 컬럼이 필요하다.

CREATE DATABASE swkang_test
GO
 
USE swkang_test
GO
 
CREATE TABLE tbl_a(
num int primary key,
name nvarchar(50),
reg_date datetime
)
GO

 

 

CDC 기능을 활성화한다.

USE swkang_test
GO
 
-- CDC Enable Database
EXEC sys.sp_cdc_enable_db
GO
/*
EXEC sys.sp_cdc_disable_db
GO
*/
 
-- CDC Enable Table
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'tbl_a',
@role_name = '',
@supports_net_changes = 1
GO
/*
EXEC sys.sp_cdc_disable_table
@source_schema = N'dbo',
@source_name   = N'tbl_a',
@capture_instance = N'dbo_tbl_a'
*/

 

 

데이터베이스가 CDC 활성화가 되었는지 확인하는 방법은 아래 스크립트를 사용한다.

select name, is_cdc_enabled from sys.databases where is_cdc_enabled = 1

 

 

 

CDC 대상 테이블을 확인하는 방법은 아래 스크립트를 사용한다.

Select name, is_tracked_by_cdc from sys.tables;

 

 

 

[Source Connector 정보 등록 (SQL Server)]

Debezium API 사용하여, 소스 커넥터 정보를 입력한다. 필자는 postman 툴을 사용하여 API 호출하였다. ( 속성에 대한 설명은 이번 포스트에서는 다루지 않는다.)

http://kafka-ip:8083/connectors

 

{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max": "1",
        "topic.prefix": "server1",
        "database.hostname" : "SQL Server IP",
        "database.server.name": "DB Name (Friendly name)",
        "database.port" : "1433",
        "database.user" : "swkang_cdc",
        "database.password" : "******",
        "database.names" : "swkang_test",
        "include.schema.changes": "true",
        "include.schema.comments" : "true",
        "tombstones.on.delete":"true",
        "schema.history.internal.kafka.bootstrap.servers" : "localhost:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory1",
        "database.encrypt": "false"
    }
}

 

 

 

 

[Target Connector 정보 등록 (PostgreSQL)]

소스의 데이터를 타겟으로 복제하기 위해 타겟 데이터베이스인 PostgreSQL 연결 정보를 등록한다. ( 속성에 대한 설명은 이번 포스트에서는 다루지 않는다.)

{
  "name": "postgresql_target_sink", 
  "config": {
      "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", 
      "tasks.max": "1", 
      "connection.url": "jdbc:postgresql://localhost/swkang_test", 
      "connection.username": "cdc_user", 
      "connection.password": "******", 
      "insert.mode": "upsert", 
      "delete.enabled": "true", 
      "primary.key.mode": "record_key", 
      "schema.evolution": "basic", 
      "database.time_zone": "UTC", 
      "topics": "server1.swkang_test.dbo.tbl_a"
  }
}

 

[실시간 데이터 복제 실습]

SQL Server에서 데이터를 입력한 다음 PostgreSQL에서 데이터를 확인한다. 이때 PostgreSQL에서는 테이블을 생성하지 않아도 SQL Server 스키마 정보를 읽고 테이블을 자동으로 생성하고, 데이터를 복제한다.

 

SQL Server에서 아래 스크립트를 실행하여 데이터를 입력한다.

insert into tbl_a values (1, 'a_skang1', 'skang1@nowcom.com')
insert into tbl_a values (2, 'a_skang2', 'skang2@nowcom.com')
insert into tbl_a values (3, 'a_skang3', 'skang2@nowcom.com')
insert into tbl_a values (4, 'a_skang4', 'skang4@nowcom.com')
insert into tbl_a values (5, 'a_skang5', 'skang5@nowcom.com')
insert into tbl_a values (6, 'a_skang6', 'skang6@nowcom.com')
insert into tbl_a values (7, 'a_skang6', 'skang6@nowcom.com')
insert into tbl_a values (8, 'a_skang8', 'skang8@nowcom.com')

 

 

PostgreSQL에서 데이터를 확인한다. 실시간으로 MS SQL Server 데이터가 PostgreSQL 복제된 것을 확인할 있다.

 

 

 

이번 포스트에서는 카프카의 Debezium 활용하여 SQL Server에서 PostgreSQL 복제를 진행하였다. Debezium 커넥터에서 제공하는 여러 커넥터를 활용하면 다양한 소스 데이터베이스에서 다양한 타겟 데이터베이스로 데이터를 실시간으로 복제할 있다.

 

 

[참고자료]

l  Kafka Connect Elasticsearch Connector in Actio : https://www.confluent.io/blog/kafka-elasticsearch-connector-tutorial/

l  How to Use Kafka Connect - Get Started : https://docs.confluent.io/platform/current/connect/userguide.html#connect-installing-plugins

l  Debezium Release Series 2.5 : https://debezium.io/releases/2.5/

l  Enable and disable change data capture : https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-ver16

 

 

 

 

2024-02-26 / Sungwook Kang / https://sungwookkang.com

 

 

KAFKA, 아파치 카프카, Apache Kafka, 데비지움, Debezium, 데이터복제, Kafka CDC, 카프카 스트림즈, 데이터 실시간 복제, 데이터 엔지니어

반응형