下面的例子演示了使用 SQL 接口控制逻辑解码。
在你能使用逻辑解码之前,你必须设置wal_level为
logical
,并且max_replication_slots
必须至少被设置为 1。然后,你应该作为一个超级用户连接到目标数据库(在下面
的例子中是postgres
)。
lightdb@postgres=# -- 使用输出插件'wal2sql'创建一个名为'regression_slot'的槽 lightdb@postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'wal2sql'); slot_name | lsn -----------------+------------ regression_slot | 0/23F5E930 (1 row) lightdb@postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn -----------------+---------+-----------+----------+--------+-------------+--------------------- regression_slot | wal2sql | logical | postgres | f | 0/23F5E8F8 | 0/23F5E930 (1 row) lightdb@postgres=# -- 目前还看不到更改 lightdb@postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----+-----+------ (0 rows) lightdb@postgres=# CREATE TABLE data(id serial primary key, data text); CREATE TABLE lightdb@postgres=# -- DDL 没有被复制,因此你将看到的东西只是空 json lightdb@postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data ------------+------+---------- 0/23F914F8 | 2048 | {"C":[]} lightdb@postgres=# -- 一旦读到更改,它们会被消费掉并且不会在一个后续调用中被发出: lightdb@postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----+-----+------ (0 rows) lightdb@postgres=# BEGIN; lightdb@postgres=*# INSERT INTO data(data) VALUES('1'); lightdb@postgres=*# INSERT INTO data(data) VALUES('2'); lightdb@postgres=*# COMMIT; lightdb@postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data ------------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 0/23FC83D8 | 2054 | {"C":[{"K":"I","S":" ","T":"data","CN":["id","data"],"columntypes":["integer","text"],"CV":[1,"1"],"pk":{"pkv":[1]}},{"K":"I","S":" ","T":"data","CN":["id","data"],"columntypes":["integer","text"],"CV":[2,"2"],"pk":{"pkv":[2]}}]} (1 row) lightdb@postgres=# INSERT INTO data(data) VALUES('3'); lightdb@postgres=# -- 你也可以不消费更改而在更改流中先看一看 lightdb@postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); lsn | xid | data ------------+------+-------------------------------------------------------------------------------------------------------------------------- 0/23FC8928 | 2055 | {"C":[{"K":"I","S":" ","T":"data","CN":["id","data"],"columntypes":["integer","text"],"CV":[3,"3"],"pk":{"pkv":[3]}}]} (1 row) lightdb@postgres=# -- 接下来对 pg_logical_slot_peek_changes() 的调用再次返回相同的更改 lightdb@postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); lsn | xid | data ------------+------+-------------------------------------------------------------------------------------------------------------------------- 0/23FC8928 | 2055 | {"C":[{"K":"I","S":" ","T":"data","CN":["id","data"],"columntypes":["integer","text"],"CV":[3,"3"],"pk":{"pkv":[3]}}]} (1 row) lightdb@postgres=# -- 可以向输出插件传递选项来影响格式化 lightdb@postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on'); lsn | xid | data ------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------- 0/23FC8928 | 2055 | {"timestamp":"2024-07-05 17:31:08.395304+08","C":[{"K":"I","S":" ","T":"data","CN":["id","data"],"columntypes":["integer","text"],"CV":[3,"3"],"pk":{"pkv":[3]}}]} (1 row) lightdb@postgres=# -- 当不再需要一个槽后记住销毁它以停止消耗服务器资源: lightdb@postgres=# SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot ----------------------- (1 row)
下面的例子展示了如何在流复制协议上使用 LightDB 发布所包括的程序lt_recvlogical来控制逻辑解码。这要求设置客户端认证以允许复制连接(见Section 25.2.5.1),并且把max_wal_senders
设置成足够高以允许一个额外的连接。
$ lt_recvlogical -d postgres --slot=test --create-slot $ lt_recvlogical -d postgres --slot=test --start -f - Control+Z $ ltsql -d postgres -c "INSERT INTO data(data) VALUES('4');" $ fg {"C":[{"K":"I","S":" ","T":"data","CN":["id","data"],"columntypes":["integer","text"],"CV":[4,"4"],"pk":{"pkv":[4]}}]} Control+C $ lt_recvlogical -d postgres --slot=test --drop-slot