适用于:Azure 数据工厂
Azure Synapse Analytics
本文概述了如何使用 Azure 数据工厂和 Azure Synapse 管道中的复制活动从/向 Snowflake 复制数据,并使用数据流转换 Snowflake 中的数据。 有关详细信息,请参阅数据工厂或 Azure Synapse Analytics 的简介文章。
重要
Snowflake V2 连接器增强了对本机 Snowflake 的支持。 如果你在解决方案中使用 Snowflake V1 连接器,建议尽早升级 Snowflake 连接器。 有关 V2 和 V1 之间的差异的详细信息,请参阅此部分。
此 Snowflake 连接器支持以下功能:
① Azure 集成运行时 ② 自承载集成运行时
对于复制活动,此 Snowflake 连接器支持以下功能:
- 从 Snowflake 复制数据,利用 Snowflake 的 COPY into [location] 命令实现最佳性能。
- 将数据复制到 Snowflake 中:利用 Snowflake 的 COPY into [table] 命令实现最佳性能。 它支持 Azure 上的 Snowflake。
- 如果需要一个代理,以便从自承载 Integration Runtime 连接到 Snowflake,则必须在 Integration Runtime 主机上为 HTTP_PROXY 和 HTTPS_PROXY 配置环境变量。
如果数据存储位于本地网络、Azure 虚拟网络或 Amazon Virtual Private Cloud 内部,则需要配置自承载集成运行时才能连接到该数据存储。 确保将自承载集成运行时使用的 IP 地址添加到允许列表中。
如果数据存储是托管的云数据服务,则可以使用 Azure Integration Runtime。 如果访问范围限制为防火墙规则中允许的 IP,你可以将 Azure Integration Runtime IP 添加到允许的列表。
用于源或接收器的 Snowflake 帐户应对数据库具有必要的 USAGE
访问权限,以及对架构及其下的表/视图的读/写访问权限。 此外,它还应该在架构上具有 CREATE STAGE
,以便能够使用 SAS URI 创建外部阶段。
必须设置以下帐户属性值
属性 | 描述 | 必需 | 默认 |
---|---|---|---|
阶段创建需要存储集成 | 指定在创建命名的外部阶段(使用 CREATE STAGE)访问私有云存储位置时是否需要存储集成对象作为云凭证。 | 假 | 假 |
阶段操作所需的存储集成 | 指定在从私有云存储位置加载数据或将数据卸载到私有云存储位置时,是否需要使用引用存储集成对象作为云凭证的命名外部阶段。 | 假 | 假 |
要详细了解网络安全机制和数据工厂支持的选项,请参阅数据访问策略。
若要使用管道执行复制活动,可以使用以下工具或 SDK 之一:
使用以下步骤在 Azure 门户 UI 中创建一个到 Snowflake 的链接服务。
浏览到 Azure 数据工厂或 Synapse 工作区中的“管理”选项卡并选择“链接服务”,然后单击“新建”:
搜索“Snowflake”并选择“Snowflake 连接器”。
配置服务详细信息、测试连接并创建新的链接服务。
以下部分详细介绍了用来定义特定于 Snowflake 连接器的实体的属性。
Snowflake 链接服务支持以下通用属性:
属性 | 描述 | 必需 |
---|---|---|
类型 | type 属性必须设置为“SnowflakeV2”。 | 是 |
版本 | 指定的版本。 建议升级到最新版本,以利用最新的增强功能。 | 是,针对版本 1.1(预览版) |
账户标识符 | 帐户的名称及其组织。 例如,myorg-account123。 | 是 |
数据库 | 连接后用于会话的默认数据库。 | 是 |
仓库 | 连接后用于会话的默认虚拟仓库。 | 是 |
认证类型 | 用于连接到 Snowflake 服务的身份验证类型。 允许的值为:Basic(默认)和 KeyPair。 有关其他属性和示例,请参阅下面的相应部分。 | 否 |
角色 | 连接后用于会话的默认安全角色。 | 否 |
主机 | Snowflake 帐户的主机名。 例如:contoso.snowflakecomputing.com 。 还支持 .cn 。 |
否 |
connectVia | 用于连接到数据存储的集成运行时。 可使用 Azure Integration Runtime 或自承载集成运行时(如果数据存储位于专用网络)。 如果未指定,则使用默认 Azure Integration Runtime。 | 否 |
此 Snowflake 连接器支持以下身份验证类型。 有关详细信息,请参阅相应部分。
要使用基本身份验证,除了上一部分中所述的通用属性外,还需要指定以下属性:
属性 | 描述 | 必需 |
---|---|---|
用户名 | Snowflake 用户的登录名。 | 是 |
密码 | Snowflake 用户的密码。 将此字段标记为 SecureString 类型以将其安全存储。 此外,还可以引用 Azure Key Vault 中存储的机密。 | 是 |
示例:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "SecureString",
"value": "<password>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Azure 密钥保管库中的密码:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
若要使用密钥对身份验证,需要参考密钥对身份验证和密钥对轮换,在 Snowflake 中配置和创建密钥对身份验证用户。 之后,请记下用于定义链接服务的私钥和通行短语(可选)。
除了前面部分所述的通用属性,还指定以下属性:
属性 | 描述 | 必需 |
---|---|---|
用户名 | Snowflake 用户的登录名。 | 是 |
私钥 | 用于密钥对身份验证的私钥。 为了确保私钥在发送到 Azure 数据工厂时有效,并且考虑到 privateKey 文件包含换行符 (\n),必须以字符串字面量形式正确设置 privateKey 内容的格式。 此过程涉及在每个换行符处显式添加 \n。 |
是 |
私钥密码短语 | 用于解密私钥的通行短语(如果已加密)。 | 否 |
示例:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "KeyPair",
"user": "<username>",
"privateKey": {
"type": "SecureString",
"value": "<privateKey>"
},
"privateKeyPassphrase": {
"type": "SecureString",
"value": "<privateKeyPassphrase>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
备注
对于映射数据流,我们建议使用 PEM 格式的 PKCS#8 标准(.p8 文件)生成新的 RSA 私钥。
有关可用于定义数据集的各部分和属性的完整列表,请参阅数据集一文。
Snowflake 数据集支持以下属性。
属性 | 描述 | 必需 |
---|---|---|
类型 | 数据集的 type 属性必须设置为 SnowflakeV2Table。 | 是 |
架构 | 架构的名称。 请注意,架构名称区分大小写。 | 对源为“否”,对汇为“是” |
桌子 | 表/视图的名称。 请注意,表名称区分大小写。 | 对源为“否”,对汇为“是” |
示例:
{
"name": "SnowflakeV2Dataset",
"properties": {
"type": "SnowflakeV2Table",
"typeProperties": {
"schema": "<Schema name for your Snowflake database>",
"table": "<Table name for your Snowflake database>"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"linkedServiceName": {
"referenceName": "<name of linked service>",
"type": "LinkedServiceReference"
}
}
}
有关可用于定义活动的各部分和属性的完整列表,请参阅管道一文。 本部分提供 Snowflake 源和接收器支持的属性列表。
Snowflake 连接器利用 Snowflake 的 COPY into [location] 命令实现最佳性能。
如果 Snowflake 的 COPY 命令以本机方式支持接收器数据存储和格式,则可使用复制活动将数据从 Snowflake 直接复制到接收器。 有关详细信息,请参阅从 Snowflake 进行的直接复制。 否则,请使用内置的从 Snowflake 进行的暂存复制。
从 Snowflake 复制数据时,复制活动的“源”部分支持以下属性。
属性 | 描述 | 必需 |
---|---|---|
类型 | 复制活动源的类型属性必须设置为 SnowflakeV2Source。 | 是 |
查询 | 指定要从 Snowflake 读取数据的 SQL 查询。 如果架构、表和列的名称包含小写字母,请在查询中引用对象标识符,例如 select * from "schema"."myTable" 。不支持执行存储过程。 |
否 |
导出设置 | 用于从 Snowflake 检索数据的高级设置。 可以配置 COPY into 命令支持的此类设置,在调用相关语句时,此服务会传递此类设置。 | 是 |
在 exportSettings 下: |
||
类型 | 导出命令的类型,设置为 SnowflakeExportCopyCommand。 | 是 |
存储集成 | 指定在 Snowflake 中创建的存储集成的名称。 有关使用存储集成的先决条件步骤,请参阅配置 Snowflake 存储集成。 | 否 |
附加复制选项 | 其他复制选项,作为键值对的字典提供。 示例:MAX_FILE_SIZE、OVERWRITE。 有关详细信息,请参阅 Snowflake 复制选项。 | 否 |
附加格式选项 | 作为键值对的字典提供给 COPY 命令的其他文件格式选项。 示例:DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT、NULL_IF。 有关详细信息,请参阅 Snowflake 格式类型选项。 使用 NULL_IF 时,Snowflake 中的 NULL 值在写入暂存存储中的分隔文本文件时会转换为指定值(需要用单引号括住)。 从暂存文件读取到目标存储时,此指定值被视为 NULL。 默认值是 'NULL' 。 |
否 |
备注
请确保你有执行以下命令的权限,并能够访问模式 INFORMATION_SCHEMA 和表 COLUMNS。
COPY INTO <location>
如果接收器数据存储和格式符合此部分所述条件,则可使用复制活动将数据从 Snowflake 直接复制到接收器。 该服务将检查设置,如果不符合以下条件,复制活动运行将会失败:
在源中指定
storageIntegration
时:接收器数据存储是你在 Snowflake 中的外部阶段引用的 Azure Blob 存储。 复制数据之前,需要完成以下步骤:
使用任何受支持的身份验证类型为接收器 Azure Blob 存储创建 Azure Blob 存储链接服务。
在接收器 Azure Blob 存储访问控制 (IAM) 中至少向 Snowflake 服务主体授予存储 Blob 数据参与者角色。
当你未在源中指定
storageIntegration
时:“接收器链接服务”是使用共享访问签名身份验证的 Azure Blob 存储。 若要采用下面受支持的格式将数据直接复制到 Azure Data Lake Storage Gen2,可以创建带有针对 Azure Data Lake Storage Gen2 帐户的 SAS 身份验证功能的 Azure Blob 存储链接服务,从而避免使用从 Snowflake 进行的分阶段复制。
接收器数据格式为“Parquet”、“带分隔符的文本”或“JSON”,其配置如下:
- 对于Parquet格式,压缩编解码器为None、Snappy或Lzo。
- 对于“带分隔符的文本”格式:
-
rowDelimiter
为 \r\n 或任何单个字符。 -
compression
可为“无压缩”、 gzip、bzip2 或 deflate。 -
encodingName
保留为默认值或设置为 utf-8。 -
quoteChar
为双引号、单引号或空字符串(无引号字符) 。
-
- 对于“JSON”格式,直接复制只支持以下情况:源 Snowflake 表或查询结果仅有一列且该列的数据类型是“VARIANT”、“OBJECT”或“ARRAY” 。
-
compression
可为“无压缩”、 gzip、bzip2 或 deflate。 -
encodingName
保留为默认值或设置为 utf-8。 -
filePattern
在复制活动接收器中保留为默认值或设置为“setOfObjects”。
-
在复制活动源中,
additionalColumns
未指定。列映射未指定。
示例:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MYTABLE",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"additionalCopyOptions": {
"MAX_FILE_SIZE": "64000000",
"OVERWRITE": true
},
"additionalFormatOptions": {
"DATE_FORMAT": "'MM/DD/YYYY'"
},
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
如果接收器数据存储或格式与上一部分所述的 Snowflake COPY 命令并非以本机方式兼容,请通过临时的 Azure Blob 存储实例启用内置暂存复制。 暂存复制功能也能提供更高的吞吐量。 此服务将数据从 Snowflake 导出到临时存储,然后将数据复制到接收器,最后从临时存储中清除临时数据。 若要详细了解如何通过暂存方式复制数据,请参阅暂存复制。
若要使用此功能,请创建一个引用 Azure 存储帐户作为临时暂存位置的 Azure Blob 存储链接服务。 然后,在复制活动中指定 enableStaging
和 stagingSettings
属性。
在源中指定
storageIntegration
时,临时暂存 Azure Blob 存储应是你在 Snowflake 中的外部阶段引用的。 请确保在使用 Azure 集成运行时时,为其创建一个 Azure Blob 存储 链接服务,并使用任何支持的身份验证方式。或者,在使用自托管集成运行时时,请使用匿名、帐户密钥、共享访问签名或服务主体身份验证。 此外,在暂存 Azure Blob 存储访问控制 (IAM) 中至少向 Snowflake 服务主体授予存储 Blob 数据参与者角色。当你未在源中指定
storageIntegration
时,暂存 Azure Blob 存储链接服务必须使用 Snowflake COPY 命令所需的共享访问签名身份验证。 确保在暂存 Azure Blob 存储中向 Snowflake 授予适当的访问权限。 要了解详细信息,请参阅本文。
示例:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MyTable",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
从 Snowflake 执行分阶段复制时,将接收器复制行为设置为“合并文件”至关重要。 此设置可确保正确处理和合并所有已分区的文件,从而防止仅复制最后一个已分区文件的问题。
示例配置
{
"type": "Copy",
"source": {
"type": "SnowflakeSource",
"query": "SELECT * FROM my_table"
},
"sink": {
"type": "AzureBlobStorage",
"copyBehavior": "MergeFiles"
}
}
备注
如果未将汇聚复制行为设置为合并文件,可能会导致只复制最后一个分区文件。
Snowflake 连接器利用 Snowflake 的 COPY into [table] 命令实现最佳性能。 它支持将数据写入 Azure 上的 Snowflake。
如果 Snowflake 的 COPY 命令可以原生支持源数据存储和格式,则可以使用复制操作将数据直接从源复制到 Snowflake。 有关详细信息,请参阅直接复制到 Snowflake。 否则,请使用内置的暂存复制到 Snowflake。
若要将数据复制到 Snowflake,复制活动的“接收器”部分需要支持以下属性。
属性 | 描述 | 必需 |
---|---|---|
类型 | “复制”活动接收器的类型属性设置为 SnowflakeV2Sink。 | 是 |
preCopyScript | 指定在每次运行中将数据写入到 Snowflake 之前要由“复制”活动运行的 SQL 查询。 使用此属性清理预加载的数据。 | 否 |
导入设置 | 用于将数据写入 Snowflake 的高级设置。 可以配置 COPY into 命令支持的此类设置,在调用相关语句时,此服务会传递此类设置。 | 是 |
在 importSettings 下: |
||
类型 | 导入命令的类型,设置为 SnowflakeImportCopyCommand。 | 是 |
存储集成 | 指定在 Snowflake 中创建的存储集成的名称。 有关使用存储集成的先决条件步骤,请参阅配置 Snowflake 存储集成。 | 否 |
附加复制选项 | 其他复制选项,作为键值对的字典提供。 示例:ON_ERROR、FORCE、LOAD_UNCERTAIN_FILES。 有关详细信息,请参阅 Snowflake 复制选项。 | 否 |
附加格式选项 | 提供给 COPY 命令的其他文件格式选项,作为键值对的字典提供。 示例:DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 有关详细信息,请参阅 Snowflake 格式类型选项。 | 否 |
备注
请确保你有执行以下命令的权限,并能够访问模式 INFORMATION_SCHEMA 和表 COLUMNS。
SELECT CURRENT_REGION()
COPY INTO <table>
SHOW REGIONS
CREATE OR REPLACE STAGE
DROP STAGE
如果源数据存储和格式符合此部分所述条件,则可使用复制活动将数据从源直接复制到 Snowflake。 该服务将检查设置,如果不符合以下条件,复制活动运行将会失败:
在接收器中指定
storageIntegration
时:源数据存储是指在 Snowflake 的外部阶段中提到的 Azure Blob 存储。 复制数据之前,需要完成以下步骤:
在接收器中不指定
storageIntegration
时:“源链接服务”是使用“共享访问签名”身份验证的 Azure Blob 存储。 若要采用下面受支持的格式直接从 Azure Data Lake Storage Gen2 复制数据,可以创建带有针对 Azure Data Lake Storage Gen2 帐户的 SAS 身份验证功能的 Azure Blob 存储链接服务,从而避免使用到 Snowflake 的分阶段复制。
“源数据格式”为“Parquet”、“带分隔符的文本”或“JSON”,其配置如下 :
对于“Parquet”格式,压缩编解码器为“None”或“Snappy”。
对于“带分隔符的文本”格式:
-
rowDelimiter
为 \r\n 或任何单个字符。 如果行分隔符不是“\r\n”,则需将firstRowAsHeader
设置为 false,且不指定skipLineCount
。 -
compression
可为“无压缩”、 gzip、bzip2 或 deflate。 -
encodingName
保留为默认值或设置为“UTF-8”、“UTF-16”、“UTF-16BE”、“UTF-32”、“UTF-32BE”、“BIG5”、“EUC-JP”、“EUC-KR”、“GB18030”、“ISO-2022-JP”、“ISO-2022-KR”、“ISO-8859-1”、“ISO-8859-2”、“ISO-8859-5”、“ISO-8859-6”、“ISO-8859-7”、“ISO-8859-8”、“ISO-8859-9”、“WINDOWS-1250”、“WINDOWS-1251”、“WINDOWS-1252”、“WINDOWS-1253”、“WINDOWS-1254”、“WINDOWS-1255”。 -
quoteChar
为双引号、单引号或空字符串(无引号字符) 。
-
对于“JSON”格式,直接复制只支持以下情况:接收器 Snowflake 表仅有一列且该列的数据类型是“VARIANT”、“OBJECT”或“ARRAY”。
-
compression
可为“无压缩”、 gzip、bzip2 或 deflate。 -
encodingName
保留为默认值或设置为 utf-8。 - 列映射未指定。
-
在“复制”活动源中:
- 未指定
additionalColumns
。 - 如果源为文件夹,则将
recursive
设置为 true。 -
prefix
、modifiedDateTimeStart
、modifiedDateTimeEnd
和enablePartitionDiscovery
未指定。
- 未指定
示例:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"copyOptions": {
"FORCE": "TRUE",
"ON_ERROR": "SKIP_FILE"
},
"fileFormatOptions": {
"DATE_FORMAT": "YYYY-MM-DD"
},
"storageIntegration": "< Snowflake storage integration name >"
}
}
}
}
]
如果源数据存储或格式与上一部分所述的 Snowflake COPY 命令并非以本机方式兼容,请通过临时 Azure Blob 存储实例启用内置暂存复制。 暂存复制功能也能提供更高的吞吐量。 该服务会自动转换数据,以满足 Snowflake 的数据格式要求。 然后,它会调用 COPY 命令将数据载入 Snowflake。 最后,它会从 Blob 存储中清理临时数据。 若要详细了解如何通过暂存方式复制数据,请参阅暂存复制。
若要使用此功能,请创建一个引用 Azure 存储帐户作为临时暂存位置的 Azure Blob 存储链接服务。 然后,在复制活动中指定 enableStaging
和 stagingSettings
属性。
在接收器中指定
storageIntegration
时,临时暂存 Azure Blob 存储应是你在 Snowflake 中的外部阶段引用的。 请确保在使用 Azure 集成运行时时,为其创建一个 Azure Blob 存储 链接服务,并使用任何支持的身份验证方式。或者,在使用自托管集成运行时时,请使用匿名、帐户密钥、共享访问签名或服务主体身份验证。 此外,在暂存 Azure Blob 存储访问控制 (IAM) 中至少向 Snowflake 服务主体授予存储 Blob 数据读者角色。在接收器中未指定
storageIntegration
时,暂存 Azure Blob 存储链接服务需要使用 Snowflake 的 COPY 命令所需的共享访问签名身份验证。
示例:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
在映射数据流中转换数据时,可以从 Snowflake 中的表读取数据以及将数据写入表中。 有关详细信息,请参阅映射数据流中的源转换和接收器转换。 可以选择使用 Snowflake 数据集或内联数据集作为源和接收器类型。
下表列出了 Snowflake 源支持的属性。 你可以在“源选项”选项卡中编辑这些属性。该连接器利用 Snowflake 内部数据传输。
名称 | 描述 | 必需 | 允许的值 | 数据流脚本属性 |
---|---|---|---|---|
表 | 如果选择“表”作为输入,则在使用内联数据集时,数据流会从在 Snowflake 数据集或源选项中指定的表中获取所有数据。 | 否 | 字符串 | (仅适用于内联数据集) 表名称 schemaName |
查询 | 如果选择“查询”作为输入,请输入用于从 Snowflake 中提取数据的查询。 此设置会替代在数据集中选择的任何表。 如果架构、表和列的名称包含小写字母,请在查询中引用对象标识符,例如 select * from "schema"."myTable" 。 |
否 | 字符串 | 查询 |
启用增量提取(预览) | 使用此选项告知 ADF 仅处理自上次执行管道以来已更改的行。 | 否 | 布尔值 | 启用Cdc |
增量列 | 使用增量提取功能时,必须选择要用作源表中水印的日期/时间/数字列。 | 否 | 字符串 | 水印列 |
启用 Snowflake 更改跟踪(预览版) | 此选项使 ADF 能够利用 Snowflake 变更数据捕获技术仅处理自上一个管道执行以来的增量数据。 此选项通过行插入、更新和删除操作自动加载增量数据,而无需任何增量列。 | 否 | 布尔值 | 启用本地Cdc |
净变化 | 使用 Snowflake 更改跟踪时,可以使用此选项获取重复更改的行或详尽的更改。 已删除重复数据的更改行将仅显示自给定时间点以来已更改的行的最新版本,而详尽的更改将显示已更改的每行的所有版本,包括已删除或更新的版本。 例如,如果更新行,你将在详尽更改中看到删除版本和插入版本,但在已删除重复数据的更改行中只看到插入版本。 根据你的用例,你可以选择适合你的需求的选项。 默认选项为 false,这意味着详尽的更改。 | 否 | 布尔值 | 净变化 |
包括系统列 | 使用 snowflake 更改跟踪时,可以使用 systemColumns 选项来控制是将 Snowflake 提供的元数据流列包括在更改跟踪输出中还是排除。 默认情况下,systemColumns 设置为 true,这意味着包含元数据流列。 如果要排除 systemColumns,可以将它们设置为 false。 | 否 | 布尔值 | systemColumns |
从头开始阅读 | 使用增量提取和更改跟踪设置此选项后,将指示 ADF 在首次执行具有增量提取的管道时读取所有行。 | 否 | 布尔值 | 跳过初始加载 |
使用 Snowflake 数据集作为源类型时,关联的数据流脚本为:
source(allowSchemaDrift: true,
validateSchema: false,
query: 'select * from MYTABLE',
format: 'query') ~> SnowflakeSource
如果使用内联数据集,则关联的数据流脚本为:
source(allowSchemaDrift: true,
validateSchema: false,
format: 'query',
query: 'select * from MYTABLE',
store: 'snowflake') ~> SnowflakeSource
Azure 数据工厂现在支持 Snowflake 中一种称为变更跟踪的本机功能,该功能通过日志形式跟踪数据变更。 Snowflake 的此功能允许我们跟踪数据随时间推移的变化,使其可用于增量数据加载和审核目的。 为了利用此功能,当你启用变更数据捕获并选择 Snowflake 更改跟踪时,我们将为源表创建一个 Stream 对象,该对象在源 Snowflake 表上启用更改跟踪。 随后,我们在查询中使用 CHANGES 子句从源表中仅提取新的或更新的数据。 此外,建议计划管道,以便在 Snowflake 源表的数据保留时间设置的时间间隔内使用更改,否则用户可能会在捕获的更改中看到不一致的行为。
下表列出了 Snowflake 接收器支持的属性。 可以在“设置”选项卡中编辑这些属性。使用内联数据集时,你会看到其他设置,这些设置与数据集属性部分所述的属性相同。 该连接器利用 Snowflake 内部数据传输。
名称 | 描述 | 必需 | 允许的值 | 数据流脚本属性 |
---|---|---|---|---|
更新方法 | 指定允许在 Snowflake 目标上执行哪些操作。 若要更新、更新插入或删除行,需要进行“更改行”转换才能标记这些操作的行。 |
是 |
true 或 false |
可删除的 insertable updateable 可插入更新 |
键列 | 对于更新、更新插入和删除操作,必须设置一个或多个键列,以确定要更改的行。 | 否 | 数组 | 密钥 |
表操作 | 确定在写入之前是否从目标表重新创建或删除所有行。 - 无:不会对该表进行任何操作。 - 重新创建:将删除表并重新创建表。 在以动态方式创建新表的情况下是必需的。 - 截断:将删除目标表中的所有行。 |
否 |
true 或 false |
重新创造 截断 |
使用 Snowflake 数据集作为接收器类型时,关联的数据流脚本为:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
如果使用内联数据集,则关联的数据流脚本为:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
tableName: 'table',
schemaName: 'schema',
deletable: true,
insertable: true,
updateable: true,
upsertable: false,
store: 'snowflake',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
通过将管道日志记录级别设置为“无”,我们排除了中间转换指标的传输,防止 Spark 优化的潜在障碍,并启用 Snowflake 提供的查询下推优化。 这种下推优化为具有大量数据集的大型 Snowflake 表提供了实质性的性能增强。
备注
我们不支持 Snowflake 中的临时表,因为它们是会话或创建它们的用户的本地表,因此其他会话无法访问它们,并且容易被 Snowflake 覆盖为常规表。 虽然Snowflake提供了可全局访问的临时表作为替代方案,但它们需要手动删除,这与我们使用临时表的主要目标(即避免在源架构中进行任何删除操作)相矛盾。
有关属性的详细信息,请参阅查找活动。
下表显示了不同版本的 Snowflake 连接器的发布阶段和更改日志:
版本 | 发布阶段 | 更改日志 |
---|---|---|
雪花 V1 | GA版本现已可用 | / |
Snowflake V2 (版本 1.0) | GA版本现已可用 | • 添加对密钥对身份验证的支持。 • accountIdentifier 、warehouse 、database 、schema 和role 属性用于建立连接而不是connectionstring 属性。• 在查找活动中添加对 BigDecimal 的支持。 在 Snowflake 中定义的 NUMBER 类型将在“查找”活动中显示为字符串。 如果要在 V2 中将其转换为数值类型,可以将管道参数与 int 函数 或 float 函数一起使用。 例如, int(activity('lookup').output.firstRow.VALUE) 、float(activity('lookup').output.firstRow.VALUE) • Snowflake 中的时间戳数据类型在 Lookup 和 Script 活动中被视为 DateTimeOffset 数据类型。 如果在升级到 V2 后仍需要将 Datetime 值用作管道中的参数,则可以使用 formatDateTime 函数 或 concat 函数将 DateTimeOffset 类型转换为 DateTime 类型。 例如: formatDateTime(activity('lookup').output.firstRow.DATETIMETYPE) 、concat(substring(activity('lookup').output.firstRow.DATETIMETYPE, 0, 19), 'Z') • 脚本活动中不支持脚本参数。 也可将动态表达式用于脚本参数。 有关详细信息,请参阅 Azure 数据工厂和 Azure Synapse Analytics 中的表达式和函数。 • 不支持在脚本活动中执行多个 SQL 语句。 |
Snowflake V2 (版本 1.1) | 推出预览版 | • 添加对脚本参数的支持。 • 在脚本活动中添加对多个语句执行的支持。 |
若要将 Snowflake 连接器从 V1 升级到 V2,可以并行升级或就地升级。
若要执行并行升级,请完成以下步骤:
- 新建 Snowflake 链接服务,并通过引用 V2 链接服务属性来对其进行配置。
- 基于新创建的 Snowflake 链接服务创建数据集。
- 在面向 V1 对象的管道中,将新的链接服务和数据集替换为现有的链接服务和数据集。
若要执行就地升级,需要编辑现有的链接服务有效负载,并将数据集更新为使用新版链接服务。
将类型从“Snowflake”更新为“SnowflakeV2”。
将链接服务有效负载从 V1 格式修改为 V2。 更改上述类型后,可以从用户界面填充每个字段,也可以直接通过 JSON 编辑器更新有效负载。 有关受支持的连接属性,请参阅本文中链接服务属性部分。 以下示例显示了 V1 和 V2 Snowflake 链接服务的有效负载差异:
Snowflake V1 链接服务 JSON 有效负载:
{ "name": "Snowflake1", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "annotations": [], "type": "Snowflake", "typeProperties": { "authenticationType": "Basic", "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC", "encryptedCredential": "<your_encrypted_credential_value>" }, "connectVia": { "referenceName": "AzureIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
Snowflake V2 链接服务 JSON 有效负载:
{ "name": "Snowflake2", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "parameters": { "schema": { "type": "string", "defaultValue": "PUBLIC" } }, "annotations": [], "type": "SnowflakeV2", "typeProperties": { "authenticationType": "Basic", "accountIdentifier": "<FAKE_Account>", "user": "FAKE_USER", "database": "FAKE_DB", "warehouse": "FAKE_DW", "encryptedCredential": "<placeholder>" }, "connectVia": { "referenceName": "AutoResolveIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
更新数据集以使用新的链接服务。 可以基于新创建的链接服务创建新数据集,也可以将现有数据集的 type 属性从 SnowflakeTable 更新为 SnowflakeV2Table。
备注
转换链接服务时,重写模板参数部分可能仅显示数据库属性。 可以通过手动编辑参数来解决此问题。 之后, “替代模板参数 ”部分将显示连接字符串。
在 “编辑链接服务 ”页中,为版本选择 1.1。 有关详细信息,请参阅链接服务属性。
要查看由复制活动支持作为源和接收器的数据存储列表,请参阅支持的数据存储和格式。