wal2sql 是一个逻辑解码输出扩展。这意味着扩展可以访问 INSERT 和 UPDATE 生成的元组。 此外,根据配置的复制标识符,可以访问 UPDATE/DELETE 旧行版本。 更改可以通过流协议(逻辑复制槽)或特殊的 SQL API 进行消费。
格式版本 1 会为每个事务生成一个 JSON 对象。该 JSON 对象中包含所有新旧元组。 此外,还有选项可以包括事务时间戳、模式限定符、数据类型和事务 ID 等属性。
格式版本 2 会为每个元组生成一个 JSON 对象。可以选择为事务的开始和结束生成 JSON 对象。 此外,还有各种选项可用于包括属性。
您需要在 lightdb.conf 中设置至少以下参数
# lightdb.conf wal_level = logical
include-transaction
(boolean
)
发出记录,表示每个事务的开始和结束。默认值为 true。
include-xids
(boolean
)
将 XID 添加到每个更改集。默认值为 false。
include-timestamp
(boolean
)
将时间戳添加到每个更改集。默认值为 false。
include-pk
(boolean
)
将主键信息添加为 pk。包括列名和数据类型。默认值为 false。
include-origin
(boolean
)
添加数据的来源。默认值为 false。
include-schemas
(boolean
)
将模式添加到每个更改中。默认值为 true。
include-types
(boolean
)
将类型添加到每个更改中。默认值为 true。
include-type-oids
(boolean
)
添加类型 OID。默认值为 false。
include-typmod
(boolean
)
对于有修饰符的类型添加修饰符(例如 varchar(20) 而不是 varchar)。默认值为 true。
include-domain-data-type
(boolean
)
将域名替换为底层数据类型。默认值为 false。
include-column-positions
(boolean
)
添加列位置(pg_attribute.attnum)。默认值为 false。
include-not-null
(boolean
)
将非空信息添加为列选项。默认值为 false。
include-default
(boolean
)
添加默认表达式。默认值为 false。
pretty-print
(boolean
)
向 JSON 结构添加空格和缩进。默认值为 false。
write-in-chunks
(boolean
)
每次更改后写入,而不是每个更改集后写入。仅在格式版本为 1 时使用。默认值为 false。
include-lsn
(boolean
)
在每个更改集中添加 nextlsn。默认值为 false。
actions
(boolean
)
定义将发送哪些操作。默认值为所有操作(插入、更新、删除和截断)。 但是,如果您使用格式版本 1,则禁用截断(向后兼容)。
filter-origins
(boolean
)
排除来自指定来源的更改。默认为空,意味着没有来源将被过滤。它是一个逗号分隔的值。
filter-tables
(boolean
)
排除来自指定表的行。默认为空,意味着没有表将被过滤。 它是一个逗号分隔的值。表应该被模式限定。 *.foo 表示所有模式中的表 foo,bar.* 表示模式 bar 中的所有表。 特殊字符(空格、单引号、逗号、点、星号)必须用反斜杠转义。 模式和表区分大小写。表 "public"."Foo bar" 应该被指定为 public.Foo\ bar。
add-tables
(boolean
)
仅包括来自指定表的行。默认是来自所有模式的所有表。它具有与 filter-tables 相同的规则。
filter-msg-prefixes
(boolean
)
如果前缀在列表中,则排除消息。默认为空,意味着没有消息将被过滤。它是一个逗号分隔的值。
add-msg-prefixes
(boolean
)
只包括前缀在列表中的消息。默认为所有前缀。这是一个逗号分隔的值。wal2sql在此参数之前应用filter-msg-prefixes。
format-version
(integer
)
定义要使用的格式。默认为1。
从wal2sql插件中获取更改(JSON对象)有两种方法:通过 ltdts_recvlogical 调用函数或使用SQL。
除上述配置外,还需要配置一个复制连接来使用 ltdts_recvlogical。
在 lt_hba.conf 中添加复制连接规则。
local mydatabase myuser trust
另外,在 lightdb.conf 中设置 max_wal_senders:
max_wal_senders = 1
如果更改了 max_wal_senders,需要重新启动。 现在可以尝试使用 wal2sql。在一个终端中执行以下命令:
# create a logical repliction slot name as test_oracle_slot ltdts_recvlogical -d postgres --slot=test_oracle_slot --create-slot -P wal2sql # delete a logical repliction slot ltdts_recvlogical -d postgres --slot=test_oracle_slot --drop-slot # print output in the first terminal ltdts_recvlogical -d postgres --slot test_oracle_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2sql -f - # forward output to oracle。 ltdts_recvlogical --tnsname=TEST --oracle-username=scott --oracle-password=tiger --start --slot=test_oracle_slot -o include-types=false -o include-type-oids=true -d postgres -h 10.20.30.193 -p 9999 -f debug.dat
保存以下代码并执行:
CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2sql'); BEGIN; INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table3_with_pk (b, c) VALUES('Replication', now()); SELECT pg_logical_emit_message(true, 'wal2sql', 'this message will be delivered'); DELETE FROM table3_with_pk WHERE a < 3; SELECT pg_logical_emit_message(false, 'wal2sql', 'this non-transactional message will be delivered even if you rollback the transaction'); INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir'); -- it is not added to stream because there isn't a pk or a replica identity UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2sql'); SELECT 'stop' FROM pg_drop_replication_slot('test_slot'); DROP TABLE table3_with_pk; DROP TABLE table3_without_pk;