E.51. wal2sql

E.51.1. 配置
E.51.2. 配置参数
E.51.3. 参数示例
E.51.4. 注意

wal2sql 是一个逻辑解码输出扩展。这意味着扩展可以访问 INSERT 和 UPDATE 生成的元组。 此外,根据配置的复制标识符,可以访问 UPDATE/DELETE 旧行版本。 更改可以通过流协议(逻辑复制槽)或特殊的 SQL API 进行消费。

格式版本 1 会为每个事务生成一个 JSON 对象。该 JSON 对象中包含所有新旧元组。 此外,还有选项可以包括事务时间戳、模式限定符、数据类型和事务 ID 等属性。

格式版本 2 会为每个元组生成一个 JSON 对象。可以选择为事务的开始和结束生成 JSON 对象。 此外,还有各种选项可用于包括属性。

E.51.1. 配置

您需要在 lightdb.conf 中设置至少以下参数

        # lightdb.conf

        wal_level = logical
        

E.51.2. 配置参数

include-transaction (boolean)

发出记录,表示每个事务的开始和结束。默认值为 true。

include-xids (boolean)

将 XID 添加到每个更改集。默认值为 false。

include-timestamp (boolean)

将时间戳添加到每个更改集。默认值为 false。

include-pk (boolean)

将主键信息添加为 pk。包括列名和数据类型。默认值为 false。

include-origin (boolean)

添加数据的来源。默认值为 false。

include-schemas (boolean)

将模式添加到每个更改中。默认值为 true。

include-types (boolean)

将类型添加到每个更改中。默认值为 true。

include-type-oids (boolean)

添加类型 OID。默认值为 false。

include-typmod (boolean)

对于有修饰符的类型添加修饰符(例如 varchar(20) 而不是 varchar)。默认值为 true。

include-domain-data-type (boolean)

将域名替换为底层数据类型。默认值为 false。

include-column-positions (boolean)

添加列位置(pg_attribute.attnum)。默认值为 false。

include-not-null (boolean)

将非空信息添加为列选项。默认值为 false。

include-default (boolean)

添加默认表达式。默认值为 false。

pretty-print (boolean)

向 JSON 结构添加空格和缩进。默认值为 false。

write-in-chunks (boolean)

每次更改后写入,而不是每个更改集后写入。仅在格式版本为 1 时使用。默认值为 false。

include-lsn (boolean)

在每个更改集中添加 nextlsn。默认值为 false。

actions (boolean)

定义将发送哪些操作。默认值为所有操作(插入、更新、删除和截断)。 但是,如果您使用格式版本 1,则禁用截断(向后兼容)。

filter-origins (boolean)

排除来自指定来源的更改。默认为空,意味着没有来源将被过滤。它是一个逗号分隔的值。

filter-tables (boolean)

排除来自指定表的行。默认为空,意味着没有表将被过滤。 它是一个逗号分隔的值。表应该被模式限定。 *.foo 表示所有模式中的表 foo,bar.* 表示模式 bar 中的所有表。 特殊字符(空格、单引号、逗号、点、星号)必须用反斜杠转义。 模式和表区分大小写。表 "public"."Foo bar" 应该被指定为 public.Foo\ bar。

add-tables (boolean)

仅包括来自指定表的行。默认是来自所有模式的所有表。它具有与 filter-tables 相同的规则。

filter-msg-prefixes (boolean)

如果前缀在列表中,则排除消息。默认为空,意味着没有消息将被过滤。它是一个逗号分隔的值。

add-msg-prefixes (boolean)

只包括前缀在列表中的消息。默认为所有前缀。这是一个逗号分隔的值。wal2sql在此参数之前应用filter-msg-prefixes。

format-version (integer)

定义要使用的格式。默认为1。

E.51.3. 参数示例

从wal2sql插件中获取更改(JSON对象)有两种方法:通过 ltdts_recvlogical 调用函数或使用SQL。

E.51.3.1. ltdts_recvlogical

除上述配置外,还需要配置一个复制连接来使用 ltdts_recvlogical

在 lt_hba.conf 中添加复制连接规则。

            local    mydatabase      myuser                     trust
            

另外,在 lightdb.conf 中设置 max_wal_senders:

            max_wal_senders = 1
            

如果更改了 max_wal_senders,需要重新启动。 现在可以尝试使用 wal2sql。在一个终端中执行以下命令:

            # create a logical repliction slot name as test_oracle_slot
            ltdts_recvlogical -d postgres  --slot=test_oracle_slot --create-slot -P wal2sql

            # delete a logical repliction slot
            ltdts_recvlogical -d postgres  --slot=test_oracle_slot --drop-slot

            # print output in the first terminal 
            ltdts_recvlogical -d postgres --slot test_oracle_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2sql -f -

            # forward output to oracle。
            ltdts_recvlogical --tnsname=TEST --oracle-username=scott --oracle-password=tiger --start --slot=test_oracle_slot -o include-types=false
            -o include-type-oids=true -d postgres -h 10.20.30.193 -p 9999 -f debug.dat
            

E.51.3.2. SQL

保存以下代码并执行:

            CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
            CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);

            SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2sql');

            BEGIN;
            INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now());
            INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now());
            INSERT INTO table3_with_pk (b, c) VALUES('Replication', now());
            SELECT pg_logical_emit_message(true, 'wal2sql', 'this message will be delivered');
            DELETE FROM table3_with_pk WHERE a < 3;
            SELECT pg_logical_emit_message(false, 'wal2sql', 'this non-transactional message will be delivered even if you rollback the transaction');

            INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir');
            -- it is not added to stream because there isn't a pk or a replica identity
            UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir';
            COMMIT;

            SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2sql');
            SELECT 'stop' FROM pg_drop_replication_slot('test_slot');

            DROP TABLE table3_with_pk;
            DROP TABLE table3_without_pk;
            

E.51.4. 注意

  1. wal2sql 不支持在 龙芯架构 上运行。

  2. 高可用性 中,请启动 ltdts_logicalrepl_copier.sh 将 slot 从主库复制到备份库。

  3. 高可用性 中,当发生故障转移时,请重新启动新的主库 LightDB 以使 slot 生效。