如何在 Azure Cosmos DB for PostgreSQL 中使用 pg_azure_storage 引入数据

适用对象:PostgreSQL 的 Azure Cosmos DB (由 PostgreSQL 的 Citus 数据库扩展提供支持)

本文介绍如何使用 pg_azure_storage PostgreSQL 扩展直接从 Azure Blob 存储 (ABS) 操作数据并将其加载到 Azure Cosmos DB for PostgreSQL 中。 ABS 是云原生可缩放、持久且安全的存储服务。 这些特征使它成为将现有数据存储和移动到云的一种很好的选择。

准备数据库和 Blob 存储

若要从 Azure Blob 存储加载数据,请在数据库中安装 pg_azure_storage PostgreSQL 扩展:

SELECT * FROM create_extension('azure_storage');

重要

pg_azure_storage 扩展仅在运行 PostgreSQL 13 及更高版本的 Azure Cosmos DB for PostgreSQL 群集上可用。

我们为本文准备了一个公共演示数据集。 若要使用自己的数据集,请参阅将本地数据迁移到云存储,了解如何将数据集高效迁移到 Azure Blob 存储。

注意

选择“容器(匿名读取访问容器和 blob)”,即可使用其公共 URL 从 Azure Blob 存储引入文件并枚举容器内容,而无需在 pg_azure_storage 中配置帐户密钥。 访问级别设置为“专用(无匿名访问)”或“Blob (仅匿名读取访问 blob)”的容器将需要访问密钥。

列出容器内容

为此操作方法预先创建了一个演示 Azure Blob 存储帐户和容器。 容器名称是 github,它位于 pgquickstart 帐户中。 我们可以轻松地使用 azure_storage.blob_list(account, container) 函数查看容器中存在的文件。

SELECT path, bytes, pg_size_pretty(bytes), content_type
  FROM azure_storage.blob_list('pgquickstart','github');
-[ RECORD 1 ]--+-------------------
path           | events.csv.gz
bytes          | 41691786
pg_size_pretty | 40 MB
content_type   | application/x-gzip
-[ RECORD 2 ]--+-------------------
path           | users.csv.gz
bytes          | 5382831
pg_size_pretty | 5257 kB
content_type   | application/x-gzip

可以使用常规 SQL WHERE 子句或使用 blob_list UDF 的 prefix 参数来筛选输出。 后者会筛选 Azure Blob 存储端返回的行。

注意

列出容器内容需要帐户和访问密钥或启用了匿名访问的容器。

SELECT * FROM azure_storage.blob_list('pgquickstart','github','e');
-[ RECORD 1 ]----+---------------------------------
path             | events.csv.gz
bytes            | 41691786
last_modified    | 2022-10-12 18:49:51+00
etag             | 0x8DAAC828B970928
content_type     | application/x-gzip
content_encoding |
content_hash     | 473b6ad25b7c88ff6e0a628889466aed
SELECT *
  FROM azure_storage.blob_list('pgquickstart','github')
 WHERE path LIKE 'e%';
-[ RECORD 1 ]----+---------------------------------
path             | events.csv.gz
bytes            | 41691786
last_modified    | 2022-10-12 18:49:51+00
etag             | 0x8DAAC828B970928
content_type     | application/x-gzip
content_encoding |
content_hash     | 473b6ad25b7c88ff6e0a628889466aed

从 ABS 加载数据

使用 COPY 命令加载数据

首先创建示例架构。

CREATE TABLE github_users
(
	user_id bigint,
	url text,
	login text,
	avatar_url text,
	gravatar_id text,
	display_login text
);

CREATE TABLE github_events
(
	event_id bigint,
	event_type text,
	event_public boolean,
	repo_id bigint,
	payload jsonb,
	repo jsonb,
	user_id bigint,
	org jsonb,
	created_at timestamp
);

CREATE INDEX event_type_index ON github_events (event_type);
CREATE INDEX payload_index ON github_events USING GIN (payload jsonb_path_ops);

SELECT create_distributed_table('github_users', 'user_id');
SELECT create_distributed_table('github_events', 'user_id');

将数据加载到表中变得像调用 COPY 命令一样简单。

-- download users and store in table

COPY github_users
FROM 'https://pgquickstart.blob.core.chinacloudapi.cn/github/users.csv.gz';

-- download events and store in table

COPY github_events
FROM 'https://pgquickstart.blob.core.chinacloudapi.cn/github/events.csv.gz';

请注意扩展如何识别提供给 copy 命令的 URL 来自 Azure Blob 存储,我们指向的文件是 gzip 压缩的,它也会为我们自动处理。

COPY 命令支持更多参数和格式。 在上面的示例中,基于文件扩展名自动选择格式和压缩。 但是,可以直接提供与常规 COPY 命令类似的格式。

COPY github_users
FROM 'https://pgquickstart.blob.core.chinacloudapi.cn/github/users.csv.gz'
WITH (FORMAT 'csv');

目前,该扩展支持以下文件格式:

format description
csv PostgreSQL COPY 使用的逗号分隔值格式
tsv 制表符分隔值,默认 PostgreSQL COPY 格式
binary 二进制 PostgreSQL COPY 格式
text 包含单个文本值的文件(例如大型 JSON 或 XML)

使用 blob_get() 加载数据

COPY 命令很方便,但灵活性有限。 在内部,COPY 使用 blob_get 函数,在更加复杂的方案中可以直接使用它来操作数据。

SELECT *
  FROM azure_storage.blob_get(
         'pgquickstart', 'github',
         'users.csv.gz', NULL::github_users
       )
 LIMIT 3;
-[ RECORD 1 ]-+--------------------------------------------
user_id       | 21
url           | https://api.github.com/users/technoweenie
login         | technoweenie
avatar_url    | https://avatars.githubusercontent.com/u/21?
gravatar_id   |
display_login | technoweenie
-[ RECORD 2 ]-+--------------------------------------------
user_id       | 22
url           | https://api.github.com/users/macournoyer
login         | macournoyer
avatar_url    | https://avatars.githubusercontent.com/u/22?
gravatar_id   |
display_login | macournoyer
-[ RECORD 3 ]-+--------------------------------------------
user_id       | 38
url           | https://api.github.com/users/atmos
login         | atmos
avatar_url    | https://avatars.githubusercontent.com/u/38?
gravatar_id   |
display_login | atmos

注意

在上述查询中,文件在应用 LIMIT 3 之前完全提取。

使用此函数,可以在复杂查询中动态操作数据,并以 INSERT FROM SELECT 的方式执行导入。

INSERT INTO github_users
     SELECT user_id, url, UPPER(login), avatar_url, gravatar_id, display_login
       FROM azure_storage.blob_get('pgquickstart', 'github', 'users.csv.gz', NULL::github_users)
      WHERE gravatar_id IS NOT NULL;
INSERT 0 264308

在上述命令中,我们将数据筛选到具有 gravatar_id 的帐户,并动态地将其登录名设置为大写。

blob_get() 选项

在某些情况下,可能需要确切地控制 decodercompressionoptions 参数尝试执行的 blob_get 操作。

解码器可以设置为 auto(默认)或以下任何值:

format description
csv PostgreSQL COPY 使用的逗号分隔值格式
tsv 制表符分隔值,默认 PostgreSQL COPY 格式
binary 二进制 PostgreSQL COPY 格式
text 包含单个文本值的文件(例如大型 JSON 或 XML)

compression 可以是 auto(默认)、nonegzip

最后,options 参数的类型为 jsonb。 有四个实用工具函数可为其生成值。 每个实用工具函数都为与其名称匹配的解码器指定。

解码器 选项函数
csv options_csv_get
tsv options_tsv
binary options_binary
text options_copy

通过查看函数定义,可以看到哪款解码器支持哪些参数。

options_csv_get - delimiter、null_string、header、quote、escape、force_not_null、force_null、content_encoding options_tsv - delimiter、null_string、content_encoding options_copy - delimiter、null_string、header、quote、escape、force_quote、force_not_null、force_null、content_encoding。 options_binary - content_encoding

了解上述内容后,我们可以在分析过程中放弃具有 null gravatar_id 的录制内容。

INSERT INTO github_users
     SELECT user_id, url, UPPER(login), avatar_url, gravatar_id, display_login
       FROM azure_storage.blob_get('pgquickstart', 'github', 'users.csv.gz', NULL::github_users,
                                    options := azure_storage.options_csv_get(force_not_null := ARRAY['gravatar_id']));
INSERT 0 264308

访问专用存储

  1. 获取帐户名称和访问密钥

    如果没有访问密钥,则不允许列出设置为“专用”或“Blob”访问级别的容器。

    SELECT * FROM azure_storage.blob_list('mystorageaccount','privdatasets');
    
    ERROR:  azure_storage: missing account access key
    HINT:  Use SELECT azure_storage.account_add('<account name>', '<access key>')
    

    在存储帐户中,打开“访问密钥”。 复制存储帐户名称,然后从“key1”部分复制密钥(必须先选择密钥旁边的“显示”)。

    Screenshot of Security + networking > Access keys section of an Azure Blob Storage page in the Azure portal.

  2. 将帐户添加到 pg_azure_storage

    SELECT azure_storage.account_add('mystorageaccount', 'SECRET_ACCESS_KEY');
    

    现在,你可以为该存储列出设置为“专用”和“Blob”访问级别的容器,但只能以被授予了 azure_storage_admin 角色的 citus 用户身份。 如果创建一个名为 support 的新用户,则默认情况下不允许访问容器内容。

    SELECT * FROM azure_storage.blob_list('pgabs','dataverse');
    
    ERROR:  azure_storage: current user support is not allowed to use storage account pgabs
    
  3. 允许 support 用户使用特定的 Azure Blob 存储帐户

    授予权限与调用 account_user_add 一样简单。

    SELECT * FROM azure_storage.account_user_add('mystorageaccount', 'support');
    

    可以在 account_list 输出中看到允许的用户,其中显示已定义访问密钥的所有帐户。

    SELECT * FROM azure_storage.account_list();
    
     account_name     | allowed_users
    ------------------+---------------
     mystorageaccount | {support}
    (1 row)
    

    如果你决定用户不应再具有访问权限, 只需调用 account_user_remove 即可。

    SELECT * FROM azure_storage.account_user_remove('mystorageaccount', 'support');
    

后续步骤

恭喜,你已学习完如何直接从 Azure Blob 存储将数据加载到 Azure Cosmos DB for PostgreSQL 中。