Azure Database for PostgreSQL 灵活服务器中的逻辑复制和逻辑解码

适用于:Azure Database for PostgreSQL - 灵活服务器

Azure Database for PostgreSQL 灵活服务器支持以下逻辑数据提取和复制方法:

  1. 逻辑复制

    1. 使用 PostgreSQL 本机逻辑复制复制数据对象。 逻辑复制允许对数据复制(包括表级数据复制)进行精细控制。
    2. 使用 pglogical 扩展提供逻辑流式复制和更多功能,如复制数据库初始架构、支持 TRUNCATE、复制 DDL 等。
  2. 逻辑解码:通过对预写日志 (WAL) 的内容进行解码来实现。

比较逻辑复制和逻辑解码

逻辑复制和逻辑解码具有一些相似之处。 这两项技术都具有以下特点:

这两种技术存在不同之处:

逻辑复制:

  • 允许指定要复制的表或表集。

逻辑解码:

  • 提取数据库中所有表的更改。

逻辑复制和逻辑解码的先决条件

  1. 转到门户中的服务器参数页。

  2. 将服务器参数 wal_level 设置为 logical

  3. 如果要使用 pglogical 扩展,请搜索 shared_preload_librariesazure.extensions 参数,然后从下拉列表框中选择 pglogical

  4. 请将 max_worker_processes 参数值更新为一个至少 16 的数字。 否则,你可能会遇到 WARNING: out of background worker slots 之类的问题。

  5. 保存更改并重启服务器以应用更改。

  6. 确认 Azure Database for PostgreSQL 灵活服务器实例允许来自连接资源的网络流量。

  7. 授予管理员用户复制权限。

    ALTER ROLE <adminname> WITH REPLICATION;
    
  8. 你可能需要确保你使用的角色对要复制的架构具有特权。 否则,你可能会遇到诸如 Permission denied for schema 之类的错误。

注意

最好将复制用户与常规管理员帐户分开。

使用逻辑复制和逻辑解码

使用本机逻辑复制是从 Azure Database for PostgreSQL 灵活服务器复制数据的最简单方法。 你可以使用 SQL 接口或流式处理协议来使用更改。 你还可以使用 SQL 接口通过逻辑解码来使用更改。

本机逻辑复制

逻辑复制使用术语“发布服务器”和“订阅服务器”。

  • 发布者是要发送其中的数据的 Azure Database for PostgreSQL 灵活服务器数据库。
  • 订阅者是要将数据发送到的 Azure Database for PostgreSQL 灵活服务器数据库。

下面是可用于尝试逻辑复制的一些示例代码。

  1. 连接到发布服务器数据库。 创建表并添加一些数据。

    CREATE TABLE basic (id INTEGER NOT NULL PRIMARY KEY, a TEXT);
    INSERT INTO basic VALUES (1, 'apple');
    INSERT INTO basic VALUES (2, 'banana');
    
  2. 为表创建发布。

    CREATE PUBLICATION pub FOR TABLE basic;
    
  3. 连接到订阅服务器数据库。 使用发布服务器上的同一架构创建表。

    CREATE TABLE basic (id INTEGER NOT NULL PRIMARY KEY, a TEXT);
    
  4. 创建一个将连接到你之前创建的发布的订阅。

    CREATE SUBSCRIPTION sub CONNECTION 'host=<server>.postgres.database.chinacloudapi.cn user=<rep_user> dbname=<dbname> password=<password>' PUBLICATION pub;
    
  5. 现在,可以在订阅服务器上查询表。 你将看到它已从发布服务器接收数据。

    SELECT * FROM basic;
    

    可将更多的行添加到发布服务器的表中,并查看订阅服务器上的更改。

    如果看不到数据,请启用 azure_pg_admin 的登录特权并检查表内容。

    ALTER ROLE azure_pg_admin login;
    

请访问 PostgreSQL 文档,详细了解逻辑复制

在同一服务器上的数据库之间使用逻辑复制

若要设置同一 Azure Database for PostgreSQL 灵活服务器实例上驻留的不同数据库之间的逻辑复制,必须遵循特定准则,以避免当前存在的实现限制。 到目前为止,只有当复制槽不是在同一命令中创建的时,创建连接到同一数据库群集的订阅才会成功;否则,CREATE SUBSCRIPTION 调用将在 LibPQWalReceiverReceive 等待事件上挂起。 这是 Postgres 引擎中的一项现有限制导致的,在将来的版本中可能会移除此限制。

要在同一服务器上的“源”和“目标”数据库之间有效地设置逻辑复制,同时规避此限制,请执行以下步骤:

首先,在源数据库和目标数据库中使用相同的架构创建一个名为“basic”的表:

-- Run this on both source and target databases
CREATE TABLE basic (id INTEGER NOT NULL PRIMARY KEY, a TEXT);

接下来,在源数据库中,为该表创建一个发布,并使用 pg_create_logical_replication_slot 函数单独创建一个逻辑复制槽,这有助于避免挂起问题,当在与订阅相同的命令中创建槽时通常会出现挂起问题。 你需要使用 pgoutput 插件:

-- Run this on the source database
CREATE PUBLICATION pub FOR TABLE basic;
SELECT pg_create_logical_replication_slot('myslot', 'pgoutput');

然后,在目标数据库中,创建对以前创建的发布的订阅,确保将 create_slot 设置为 false,以防止 Azure Database for PostgreSQL 灵活服务器创建新槽,并正确指定在上一步中创建的槽名称。 在运行此命令之前,请将连接字符串中的占位符替换为你的实际数据库凭据:

-- Run this on the target database
CREATE SUBSCRIPTION sub
   CONNECTION 'dbname=<source dbname> host=<server>.postgres.database.chinacloudapi.cn port=5432 user=<rep_user> password=<password>'
   PUBLICATION pub
   WITH (create_slot = false, slot_name='myslot');

设置逻辑复制后,你现在可以如下所述对其进行测试:将一条新记录插入到源数据库中的“basic”表中,然后验证该记录是否复制到目标数据库:

-- Run this on the source database
INSERT INTO basic SELECT 3, 'mango';

-- Run this on the target database
TABLE basic;

如果所有配置都正确,则你在目标数据库中应当会看到来自源数据库的新记录,从而确认逻辑复制已成功设置。

pglogical 扩展

下面是在提供者数据库服务器和订阅服务器上配置 pglogical 的示例。 有关详细信息,请参阅 pglogical 扩展文档。 此外,请确保你已执行上面列出的先决条件任务。

  1. 在提供者数据库服务器和订阅者数据库服务器的数据库中安装 pglogical 扩展。

    \c myDB
    CREATE EXTENSION pglogical;
    
  2. 如果复制用户不同于(创建服务器的)服务器管理用户,请确保向用户授予具有角色 azure_pg_admin 的成员身份,并为用户分配 REPLICATION 和 LOGIN 属性。 有关详细信息,请参阅 pglogical 文档

    GRANT azure_pg_admin to myUser;
    ALTER ROLE myUser REPLICATION LOGIN;
    
  3. 在提供者(源/发布者)数据库服务器上,创建提供者节点。

    select pglogical.create_node( node_name := 'provider1',
    dsn := ' host=myProviderServer.postgres.database.chinacloudapi.cn port=5432 dbname=myDB user=myUser password=<password>');
    
  4. 创建一个复制集。

    select pglogical.create_replication_set('myreplicationset');
    
  5. 将数据库中的所有表添加到该复制集。

    SELECT pglogical.replication_set_add_all_tables('myreplicationset', '{public}'::text[]);
    

    也可将特定架构(例如 testUser)中的表添加到默认的复制集,这是替代方法。

    SELECT pglogical.replication_set_add_all_tables('default', ARRAY['testUser']);
    
  6. 在订阅者数据库服务器上,创建一个订阅者节点。

    select pglogical.create_node( node_name := 'subscriber1',
    dsn := ' host=mySubscriberServer.postgres.database.chinacloudapi.cn port=5432 dbname=myDB user=myUser password=<password>' );
    
  7. 创建一个订阅以开始同步和复制过程。

    select pglogical.create_subscription (
    subscription_name := 'subscription1',
    replication_sets := array['myreplicationset'],
    provider_dsn := 'host=myProviderServer.postgres.database.chinacloudapi.cn port=5432 dbname=myDB user=myUser password=<password>');
    
  8. 然后即可验证订阅状态。

    SELECT subscription_name, status FROM pglogical.show_subscription_status();
    

注意

Pglogical 目前不支持自动 DDL 复制。 可以使用 pg_dump 手动复制初始架构(仅限架构)。 可以使用 pglogical.replicate_ddl_command 函数同时在提供程序和订阅服务器上执行 DDL 语句。 请注意此处列出的扩展的其他限制。

逻辑解码

可以通过流式处理协议或 SQL 接口使用逻辑解码。

流式处理协议

我们通常倾向于通过流式处理协议使用更改。 你可以创建自己的使用者/连接器,或者使用 Debezium 之类的第三方服务。

请访问 wal2json 文档,查看将流式处理协议与 pg_recvlogical 配合使用的示例

SQL 接口

以下示例将 SQL 接口与 wal2json 插件配合使用。

  1. 创建槽。

    SELECT * FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
    
  2. 发出 SQL 命令。 例如:

    CREATE TABLE a_table (
       id varchar(40) NOT NULL,
       item varchar(40),
       PRIMARY KEY (id)
    );
    
    INSERT INTO a_table (id, item) VALUES ('id1', 'item1');
    DELETE FROM a_table WHERE id='id1';
    
  3. 使用更改。

    SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');
    

    输出如下所示:

    {
          "change": [
          ]
    }
    {
          "change": [
                   {
                            "kind": "insert",
                            "schema": "public",
                            "table": "a_table",
                            "columnnames": ["id", "item"],
                            "columntypes": ["character varying(40)", "character varying(40)"],
                            "columnvalues": ["id1", "item1"]
                   }
          ]
    }
    {
          "change": [
                   {
                            "kind": "delete",
                            "schema": "public",
                            "table": "a_table",
                            "oldkeys": {
                                  "keynames": ["id"],
                                  "keytypes": ["character varying(40)"],
                                  "keyvalues": ["id1"]
                            }
                   }
          ]
    }
    
  4. 用完槽后,请将其删除。

    SELECT pg_drop_replication_slot('test_slot');
    

请访问 PostgreSQL 文档,详细了解逻辑解码

监视器

必须监视逻辑解码。 必须删除任何未使用的复制槽。 在读取更改之前,槽会一直保存到 Postgres WAL 日志和相关的系统目录。 如果你的订阅服务器或使用者失败或者配置不正确,则未使用的日志将不断堆积,直至填满存储。 此外,未使用的日志会增大事务 ID 换行的风险。 这两种情况可能会导致服务器不可用。 因此,逻辑复制槽必须被持续使用。 如果不再使用某个逻辑复制槽,请立即将其删除。

pg_replication_slots 视图中的“active”列指示是否有使用者连接到某个槽。

SELECT * FROM pg_replication_slots;

针对使用的事务 ID 上限和使用的存储 Azure Database for PostgreSQL 灵活服务器指标设置警报,以便在值超过正常阈值时通知你。

限制

  • 逻辑复制限制适用,如此处所述。

重要

如果相应的订阅服务器不再存在,则必须删除主服务器中的逻辑复制槽。 否则,WAL 文件开始在主存储中累积,从而填满存储。 假设存储阈值超出特定阈值,并且逻辑复制槽未被使用(由于订阅服务器不可用)。 在这种情况下,Azure Database for PostgreSQL 灵活服务器实例会自动删除未使用的逻辑复制槽。 此操作会释放累积的 WAL 文件,避免服务器因存储空间被占满而不可用。