Azure Database for PostgreSQL 灵活服务器中的逻辑复制和逻辑解码
适用于:Azure Database for PostgreSQL - 灵活服务器
Azure Database for PostgreSQL 灵活服务器支持以下逻辑数据提取和复制方法:
逻辑复制
逻辑解码:通过对预写日志 (WAL) 的内容进行解码来实现。
比较逻辑复制和逻辑解码
逻辑复制和逻辑解码具有一些相似之处。 这两项技术都具有以下特点:
允许复制 Postgres 之外的数据。
使用预写日志 (WAL) 作为更改源。
使用逻辑复制槽发出数据。 槽表示更改流。
使用表的 REPLICA IDENTITY 属性确定可以发出的更改。
不要复制 DDL 更改。
这两种技术存在不同之处:
逻辑复制:
- 允许指定要复制的表或表集。
逻辑解码:
- 提取数据库中所有表的更改。
逻辑复制和逻辑解码的先决条件
转到门户中的服务器参数页。
将服务器参数
wal_level
设置为logical
。如果要使用 pglogical 扩展,请搜索
shared_preload_libraries
和azure.extensions
参数,然后从下拉列表框中选择pglogical
。请将
max_worker_processes
参数值更新为一个至少 16 的数字。 否则,你可能会遇到WARNING: out of background worker slots
之类的问题。保存更改并重启服务器以应用更改。
确认 Azure Database for PostgreSQL 灵活服务器实例允许来自连接资源的网络流量。
授予管理员用户复制权限。
ALTER ROLE <adminname> WITH REPLICATION;
你可能需要确保你使用的角色对要复制的架构具有特权。 否则,你可能会遇到诸如
Permission denied for schema
之类的错误。
注意
最好将复制用户与常规管理员帐户分开。
使用逻辑复制和逻辑解码
使用本机逻辑复制是从 Azure Database for PostgreSQL 灵活服务器复制数据的最简单方法。 你可以使用 SQL 接口或流式处理协议来使用更改。 你还可以使用 SQL 接口通过逻辑解码来使用更改。
本机逻辑复制
逻辑复制使用术语“发布服务器”和“订阅服务器”。
- 发布者是要发送其中的数据的 Azure Database for PostgreSQL 灵活服务器数据库。
- 订阅者是要将数据发送到的 Azure Database for PostgreSQL 灵活服务器数据库。
下面是可用于尝试逻辑复制的一些示例代码。
连接到发布服务器数据库。 创建表并添加一些数据。
CREATE TABLE basic (id INTEGER NOT NULL PRIMARY KEY, a TEXT); INSERT INTO basic VALUES (1, 'apple'); INSERT INTO basic VALUES (2, 'banana');
为表创建发布。
CREATE PUBLICATION pub FOR TABLE basic;
连接到订阅服务器数据库。 使用发布服务器上的同一架构创建表。
CREATE TABLE basic (id INTEGER NOT NULL PRIMARY KEY, a TEXT);
创建一个将连接到你之前创建的发布的订阅。
CREATE SUBSCRIPTION sub CONNECTION 'host=<server>.postgres.database.chinacloudapi.cn user=<rep_user> dbname=<dbname> password=<password>' PUBLICATION pub;
现在,可以在订阅服务器上查询表。 你将看到它已从发布服务器接收数据。
SELECT * FROM basic;
可将更多的行添加到发布服务器的表中,并查看订阅服务器上的更改。
如果看不到数据,请启用
azure_pg_admin
的登录特权并检查表内容。ALTER ROLE azure_pg_admin login;
请访问 PostgreSQL 文档,详细了解逻辑复制。
在同一服务器上的数据库之间使用逻辑复制
若要设置同一 Azure Database for PostgreSQL 灵活服务器实例上驻留的不同数据库之间的逻辑复制,必须遵循特定准则,以避免当前存在的实现限制。 到目前为止,只有当复制槽不是在同一命令中创建的时,创建连接到同一数据库群集的订阅才会成功;否则,CREATE SUBSCRIPTION
调用将在 LibPQWalReceiverReceive
等待事件上挂起。 这是 Postgres 引擎中的一项现有限制导致的,在将来的版本中可能会移除此限制。
要在同一服务器上的“源”和“目标”数据库之间有效地设置逻辑复制,同时规避此限制,请执行以下步骤:
首先,在源数据库和目标数据库中使用相同的架构创建一个名为“basic”的表:
-- Run this on both source and target databases
CREATE TABLE basic (id INTEGER NOT NULL PRIMARY KEY, a TEXT);
接下来,在源数据库中,为该表创建一个发布,并使用 pg_create_logical_replication_slot
函数单独创建一个逻辑复制槽,这有助于避免挂起问题,当在与订阅相同的命令中创建槽时通常会出现挂起问题。 你需要使用 pgoutput
插件:
-- Run this on the source database
CREATE PUBLICATION pub FOR TABLE basic;
SELECT pg_create_logical_replication_slot('myslot', 'pgoutput');
然后,在目标数据库中,创建对以前创建的发布的订阅,确保将 create_slot
设置为 false
,以防止 Azure Database for PostgreSQL 灵活服务器创建新槽,并正确指定在上一步中创建的槽名称。 在运行此命令之前,请将连接字符串中的占位符替换为你的实际数据库凭据:
-- Run this on the target database
CREATE SUBSCRIPTION sub
CONNECTION 'dbname=<source dbname> host=<server>.postgres.database.chinacloudapi.cn port=5432 user=<rep_user> password=<password>'
PUBLICATION pub
WITH (create_slot = false, slot_name='myslot');
设置逻辑复制后,你现在可以如下所述对其进行测试:将一条新记录插入到源数据库中的“basic”表中,然后验证该记录是否复制到目标数据库:
-- Run this on the source database
INSERT INTO basic SELECT 3, 'mango';
-- Run this on the target database
TABLE basic;
如果所有配置都正确,则你在目标数据库中应当会看到来自源数据库的新记录,从而确认逻辑复制已成功设置。
pglogical 扩展
下面是在提供者数据库服务器和订阅服务器上配置 pglogical 的示例。 有关详细信息,请参阅 pglogical 扩展文档。 此外,请确保你已执行上面列出的先决条件任务。
在提供者数据库服务器和订阅者数据库服务器的数据库中安装 pglogical 扩展。
\c myDB CREATE EXTENSION pglogical;
如果复制用户不同于(创建服务器的)服务器管理用户,请确保向用户授予具有角色
azure_pg_admin
的成员身份,并为用户分配 REPLICATION 和 LOGIN 属性。 有关详细信息,请参阅 pglogical 文档。GRANT azure_pg_admin to myUser; ALTER ROLE myUser REPLICATION LOGIN;
在提供者(源/发布者)数据库服务器上,创建提供者节点。
select pglogical.create_node( node_name := 'provider1', dsn := ' host=myProviderServer.postgres.database.chinacloudapi.cn port=5432 dbname=myDB user=myUser password=<password>');
创建一个复制集。
select pglogical.create_replication_set('myreplicationset');
将数据库中的所有表添加到该复制集。
SELECT pglogical.replication_set_add_all_tables('myreplicationset', '{public}'::text[]);
也可将特定架构(例如 testUser)中的表添加到默认的复制集,这是替代方法。
SELECT pglogical.replication_set_add_all_tables('default', ARRAY['testUser']);
在订阅者数据库服务器上,创建一个订阅者节点。
select pglogical.create_node( node_name := 'subscriber1', dsn := ' host=mySubscriberServer.postgres.database.chinacloudapi.cn port=5432 dbname=myDB user=myUser password=<password>' );
创建一个订阅以开始同步和复制过程。
select pglogical.create_subscription ( subscription_name := 'subscription1', replication_sets := array['myreplicationset'], provider_dsn := 'host=myProviderServer.postgres.database.chinacloudapi.cn port=5432 dbname=myDB user=myUser password=<password>');
然后即可验证订阅状态。
SELECT subscription_name, status FROM pglogical.show_subscription_status();
注意
Pglogical 目前不支持自动 DDL 复制。 可以使用 pg_dump 手动复制初始架构(仅限架构)。 可以使用 pglogical.replicate_ddl_command 函数同时在提供程序和订阅服务器上执行 DDL 语句。 请注意此处列出的扩展的其他限制。
逻辑解码
可以通过流式处理协议或 SQL 接口使用逻辑解码。
流式处理协议
我们通常倾向于通过流式处理协议使用更改。 你可以创建自己的使用者/连接器,或者使用 Debezium 之类的第三方服务。
请访问 wal2json 文档,查看将流式处理协议与 pg_recvlogical 配合使用的示例。
SQL 接口
以下示例将 SQL 接口与 wal2json 插件配合使用。
创建槽。
SELECT * FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
发出 SQL 命令。 例如:
CREATE TABLE a_table ( id varchar(40) NOT NULL, item varchar(40), PRIMARY KEY (id) ); INSERT INTO a_table (id, item) VALUES ('id1', 'item1'); DELETE FROM a_table WHERE id='id1';
使用更改。
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');
输出如下所示:
{ "change": [ ] } { "change": [ { "kind": "insert", "schema": "public", "table": "a_table", "columnnames": ["id", "item"], "columntypes": ["character varying(40)", "character varying(40)"], "columnvalues": ["id1", "item1"] } ] } { "change": [ { "kind": "delete", "schema": "public", "table": "a_table", "oldkeys": { "keynames": ["id"], "keytypes": ["character varying(40)"], "keyvalues": ["id1"] } } ] }
用完槽后,请将其删除。
SELECT pg_drop_replication_slot('test_slot');
请访问 PostgreSQL 文档,详细了解逻辑解码。
监视器
必须监视逻辑解码。 必须删除任何未使用的复制槽。 在读取更改之前,槽会一直保存到 Postgres WAL 日志和相关的系统目录。 如果你的订阅服务器或使用者失败或者配置不正确,则未使用的日志将不断堆积,直至填满存储。 此外,未使用的日志会增大事务 ID 换行的风险。 这两种情况可能会导致服务器不可用。 因此,逻辑复制槽必须被持续使用。 如果不再使用某个逻辑复制槽,请立即将其删除。
pg_replication_slots
视图中的“active”列指示是否有使用者连接到某个槽。
SELECT * FROM pg_replication_slots;
针对使用的事务 ID 上限和使用的存储 Azure Database for PostgreSQL 灵活服务器指标设置警报,以便在值超过正常阈值时通知你。
限制
- 逻辑复制限制适用,如此处所述。
重要
如果相应的订阅服务器不再存在,则必须删除主服务器中的逻辑复制槽。 否则,WAL 文件开始在主存储中累积,从而填满存储。 假设存储阈值超出特定阈值,并且逻辑复制槽未被使用(由于订阅服务器不可用)。 在这种情况下,Azure Database for PostgreSQL 灵活服务器实例会自动删除未使用的逻辑复制槽。 此操作会释放累积的 WAL 文件,避免服务器因存储空间被占满而不可用。