CData Software Blog

クラウド連携のCData Software の技術ブログです。

ログベースによる変更データキャプチャ(CDC)で PostgreSQL → BigQuery のレプリケーションをやってみた:CData Sync


こんにちは、テクニカルサポートの宮本です!

最近、Debezium などの変更データキャプチャに特化した SaaS がいくつか提供されていたりと、Change Data Capture(CDC)というワードを見かける機会が増えてきたなと個人的に感じています。

Change Data Capture - Modern Data Stack | Modern Data Stack

CData Sync においては以前ご紹介したように SQLServer コネクタによる CDC での変更データ取得はできるようになっていましたが、
www.cdatablog.jp

新たに PostgreSQL コネクタでも CDC 機能のサポートが行われました!

www.cdata.com


今回はその設定方法からレプリケートジョブを実行して変更データを連携するところを実際に行ってみたいと思います。

PostgreSQL 変更データキャプチャ(CDC)の種類

PostgreSQL に CDC 方法としては3つのアプローチがあります。

  • トリガー
  • クエリベース
  • ログベース

では簡単にそれぞれについてみていきます。

トリガー
トリガーで行うCDCは、対象テーブルにおいてレコードに変更が入ったタイミングで別なテーブルに変更情報を連携します。なのでInsert、Update、Deleteの内容をリアルタイムでキャプチャすることが可能です。
しかし別なテーブルを管理することでの運用の複雑さであったり、変更が入ったタイミングでトリガーが実行されることで元のステートメントの実行時間が増えてしまったりと、デメリットもあります。

クエリベース
クエリベースは対象テーブルにある更新日時などのタイムスタンプ項目に対してクエリして取得レコードを絞ります。PostgreSQL 自体の設定など変更することなく、" WHERE updated_at> '最終更新日時' " ですぐに実行できるのがメリットではあるものの、データ量によってはPostgreSQL 自体に負荷を掛けてしまったり、そもそも更新日時をテーブルで持つ必要がある、削除レコードには対応していないなどがデメリットとして存在します。

ログベース
ログベースは WAL(ログ先行書き込み)というトランザクションログを Logical Decoding という機能でトランザクションログの内容を論理的なレコードで抽出して変更情報をキャプチャする方法です。

トランザクションログを参照しているので、リアルタイムで Insert、Update、Delete の変更データをキャプチャできるのに加え、ログを直接参照するので DB へのパフォーマンスにも影響がないのが特徴です。また、変更データを保持した別なテーブルを用意する必要がないので維持管理でも楽です。デメリットとしては古いバージョンでは利用することができない機能となってます。
CData Sync ではこのログベースによる CDC を今回サポートしました。


上記内容をまとめてみるとこのようになります。

種類 リアルタイム 削除レコード対応 パフォーマンス 古いバージョン対応
トリガー
Ver9.1から
クエリベース ×
ログベース
Ver9.4から

手順

ではさっそく確認していきたいと思いますが、必要なものは以下のものだけでOKです。

PostgreSQL の CDC 設定

まずは PostgreSQL の CDC 設定状況を確認していきますので、下記クエリを実行します。

SELECT name, setting FROM pg_settings WHERE name='wal_level';

デフォルトでは下記のように wal_level の setting の値が replica になっているかと思いますので、これを logical に変更します。logical にすることによって Logical Decoding をサポートするのに必要な情報を追加するようになります。

設定変更は postgresql.conf を直接修正する形になります。
Windows の場合の参考パス:
C:\Program Files\PostgreSQL\14\data\postgresql.conf

これを開いて、 wal_level = の値を logical に変更します。
wal_level = logical

設定変更後は PostgreSQL を再起動して再度クエリを実行し、setting の値が logical に変更されてたらOKです。

レプリケーションスロットの割り当て

レプリケーションスロットという機能を使って WAL ファイルから Logical Decoding で変更情報をキャプチャするための設定を行います。

といっても、下記クエリを実行するのみです。

SELECT * FROM pg_create_logical_replication_slot('cdatasync_replication_slot', 'test_decoding');

ちなみに pg_create_logical_replication_slot 関数の引数は下記のとおりです。

これで PostgreSQL 側の CDC の設定が完了です。

CDataSyncのインストール

CData Sync はセルフホスティング or AWS で提供している ETL/ELT ツールで、エンジニアでなくても容易に SaaS データを同期先 DB にレプリケートすることができるツールです。

自動データレプリケーション | CData Software Japan

インストール方法は以下のハンズオン記事を参照してみてください。(すぐ終わります)

www.cdatablog.jp

接続情報の作成

PostgreSQL の接続情報を作成していきますので、接続一覧画面で PostgreSQL アイコンをクリックします。(もし表示されてない場合は右下にあるアイコンの "+ Add More" から追加してください)

接続情報をそれぞれ入力したら接続テストして保存ボタンをクリックします。

同じような手順で BigQuery の接続設定も行います。
BigQuery ではOAuth認証とサービスアカウントどちらでも利用可能ですが、OAuth認証の場合はプロジェクト名、データセット名を入力してから接続テストをクリックして CDataSync からの接続許可を行います。

これで接続設定が完了しました。

ジョブ作成

では次にジョブを作成していきますので、ジョブ一覧画面で「ジョブを追加」ボタンをクリックします。表示されるウィンドウで下記を入力・選択します。

項目名 設定内容
ジョブ名 任意のジョブ名
ソース PostgreSQL
同期先 BigQuery
レプリケーションの種類 変更データキャプチャ
Logical Replication Slot PostgreSQLレプリケーション・スロット名

入力したら作成ボタンでジョブを作成します。

次に連携したいテーブルを選択すると、

自動的に連携クエリが作成されます。

ちなみに連携元テーブルは更新日時項目などのタイムスタンプ項目を持っていないシンプルなテーブルになります。

ジョブ実行

それではジョブを実行してみます!チェックボックスをオンにして実行ボタンをクリックするとジョブが実行され、正常に同期されると同期件数が右側に表示されます。今回は176件のレコードが BigQuery に連携されたようです。

CDC機能の確認

CDC 機能の確認としてまずはもう一度ジョブを実行してみます。
PostgreSQL 側はまだ変更していないので、CDC が有効であればジョブの結果は0件となる想定ですが、ちゃんと0件 という結果が表示されました。

では最後にPostgreSQL 側に2件レコードを追加して確認してみます。

Insert into public."Account_cdc"("Id", "Name") Values('1234ABCD5678AAAA01', 'cdata cdc test 1');
Insert into public."Account_cdc"("Id", "Name") Values('1234ABCD5678AAAA02', 'cdata cdc test 2');

この状態でジョブをもう一度実行してみると、2件だけ BigQuery に連携してくれました。

BigQuery 側をみてみると、今追加したレコードを確認することができました。

おわりに

いかがでしたでしょうか。PostgreSQL の CDC 機能で BigQuery に更新日時を持たないテーブルでも簡単に差分データを同期することができました。今回は同期先に BigQuery 使ってみましたが、Snowflake でも他の RDB でも同様に PostgreSQL の CDC レプリケーションが可能です。

CDataSync は 30日間の無償トライアル利用が可能ですので是非お試しください!

www.cdata.com