使用 Azure 数据工厂在 Snowflake 中复制和转换数据Copy and transform data in Snowflake by using Azure Data Factory

适用于: Azure 数据工厂 Azure Synapse Analytics(预览版)

本文概述如何使用 Azure 数据工厂中的复制活动从/向 Snowflake 复制数据。This article outlines how to use the Copy activity in Azure Data Factory to copy data from and to Snowflake. 有关数据工厂的详细信息,请参阅介绍性文章For more information about Data Factory, see the introductory article.

支持的功能Supported capabilities

以下活动支持此 Snowflake 连接器:This Snowflake connector is supported for the following activities:

对于复制活动,此 Snowflake 连接器支持以下功能:For the Copy activity, this Snowflake connector supports the following functions:

  • 从 Snowflake 复制数据:利用 Snowflake 的 COPY into [location] 命令实现最佳性能。Copy data from Snowflake that utilizes Snowflake's COPY into [location] command to achieve the best performance.
  • 将数据复制到 Snowflake 中:利用 Snowflake 的 COPY into [table] 命令实现最佳性能。Copy data to Snowflake that takes advantage of Snowflake's COPY into [table] command to achieve the best performance. 它支持 Azure 上的 Snowflake。It supports Snowflake on Azure.

入门Get started

若要使用管道执行复制活动,可以使用以下工具或 SDK 之一:To perform the Copy activity with a pipeline, you can use one of the following tools or SDKs:

以下部分详细介绍了用来定义特定于 Snowflake 连接器的数据工厂实体的属性。The following sections provide details about properties that define Data Factory entities specific to a Snowflake connector.

链接服务属性Linked service properties

Snowflake 链接服务支持以下属性。The following properties are supported for a Snowflake-linked service.

属性Property 说明Description 必须Required
typetype type 属性必须设置为 SnowflakeThe type property must be set to Snowflake. Yes
connectionStringconnectionString 指定连接到 Snowflake 实例所需的信息。Specifies the information needed to connect to the Snowflake instance. 可以选择将密码或整个连接字符串置于 Azure Key Vault。You can choose to put password or entire connection string in Azure Key Vault. 如需更多详细信息,请参阅表下面的示例和将凭据存储在 Azure Key Vault 中一文。Refer to the examples below the table, as well as the Store credentials in Azure Key Vault article, for more details.

部分典型设置:Some typical settings:
- 帐户名称:Snowflake 帐户的完整帐户名称(包括用于标识区域和云平台的其他段)- Account name: The full account name of your Snowflake account (including additional segments that identify the region and cloud platform)
- 用户名:用于连接的用户登录名。- User name: The login name of the user for the connection.
- 密码:用户的密码。- Password: The password for the user.
- 数据库:要在连接后使用的默认数据库。- Database: The default database to use once connected. 它应为指定角色具有权限的现有数据库。It should be an existing database for which the specified role has privileges.
- 仓库:要在连接后使用的虚拟仓库。- Warehouse: The virtual warehouse to use once connected. 它应为指定角色具有权限的现有仓库。It should be an existing warehouse for which the specified role has privileges.
- 角色:要在 Snowflake 会话中使用的默认访问控制角色。- Role: The default access control role to use in the Snowflake session. 指定角色应为已分配给指定用户的现有角色。The specified role should be an existing role that has already been assigned to the specified user. 默认角色为 PUBLIC。The default role is PUBLIC.
Yes
connectViaconnectVia 用于连接到数据存储的 Integration RuntimeThe integration runtime that is used to connect to the data store. 可使用 Azure Integration Runtime 或自承载集成运行时(如果数据存储位于专用网络)。You can use the Azure integration runtime or a self-hosted integration runtime (if your data store is located in a private network). 如果未指定,则使用默认 Azure Integration Runtime。If not specified, it uses the default Azure integration runtime. No

示例:Example:

{
    "name": "SnowflakeLinkedService",
    "properties": {
        "type": "Snowflake",
        "typeProperties": {
            "connectionString": "jdbc:snowflake://<accountname>.snowflakecomputing.com/?user=<username>&db=<database>&warehouse=<warehouse>&role=<myRole>",
            "password": {
                "type": "SecureString",
                "value": "<password>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Azure 密钥保管库中的密码:Password in Azure Key Vault:

{
    "name": "SnowflakeLinkedService",
    "properties": {
        "type": "Snowflake",
        "typeProperties": {
            "connectionString": "jdbc:snowflake://<accountname>.snowflakecomputing.com/?user=<username>&db=<database>&warehouse=<warehouse>&role=<myRole>",
            "password": {
                "type": "AzureKeyVaultSecret",
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>",
                    "type": "LinkedServiceReference"
                }, 
                "secretName": "<secretName>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

数据集属性Dataset properties

有关可用于定义数据集的各部分和属性的完整列表,请参阅数据集一文。For a full list of sections and properties available for defining datasets, see the Datasets article.

Snowflake 数据集支持以下属性。The following properties are supported for the Snowflake dataset.

属性Property 说明Description 必需Required
typetype 数据集的 type 属性必须设置为 SnowflakeTable。The type property of the dataset must be set to SnowflakeTable. Yes
架构schema 架构的名称。Name of the schema. 请注意,架构名称在 ADF 中区分大小写。Note the schema name is case-sensitive in ADF. 对于源为“否”,对于接收器为“是”No for source, yes for sink
table 表/视图的名称。Name of the table/view. 请注意,表名称在 ADF 中区分大小写。Note the table name is case-sensitive in ADF. 对于源为“否”,对于接收器为“是”No for source, yes for sink

示例:Example:

{
    "name": "SnowflakeDataset",
    "properties": {
        "type": "SnowflakeTable",
        "typeProperties": {
            "schema": "<Schema name for your Snowflake database>",
            "table": "<Table name for your Snowflake database>"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "linkedServiceName": {
            "referenceName": "<name of linked service>",
            "type": "LinkedServiceReference"
        }
    }
}

复制活动属性Copy activity properties

有关可用于定义活动的各部分和属性的完整列表,请参阅管道一文。For a full list of sections and properties available for defining activities, see the Pipelines article. 本部分提供 Snowflake 源和接收器支持的属性列表。This section provides a list of properties supported by the Snowflake source and sink.

以 Snowflake 作为源Snowflake as the source

Snowflake 连接器利用 Snowflake 的 COPY into [location] 命令实现最佳性能。Snowflake connector utilizes Snowflake’s COPY into [location] command to achieve the best performance.

如果 Snowflake 的 COPY 命令以本机方式支持接收器数据存储和格式,则可使用复制活动将数据从 Snowflake 直接复制到接收器。If sink data store and format are natively supported by the Snowflake COPY command, you can use the Copy activity to directly copy from Snowflake to sink. 有关详细信息,请参阅从 Snowflake 进行的直接复制For details, see Direct copy from Snowflake. 否则,请使用内置的从 Snowflake 进行的暂存复制Otherwise, use built-in Staged copy from Snowflake.

从 Snowflake 复制数据时,复制活动的“源”部分支持以下属性。To copy data from Snowflake, the following properties are supported in the Copy activity source section.

属性Property 说明Description 必需Required
typetype 复制活动源的类型属性必须设置为 SnowflakeSource。The type property of the Copy activity source must be set to SnowflakeSource. Yes
查询query 指定要从 Snowflake 读取数据的 SQL 查询。Specifies the SQL query to read data from Snowflake. 如果架构、表和列的名称包含小写字母,请在查询中引用对象标识符,例如 select * from "schema"."myTable"If the names of the schema, table and columns contain lower case, quote the object identifier in query e.g. select * from "schema"."myTable".
不支持执行存储过程。Executing stored procedure is not supported.
No
exportSettingsexportSettings 用于从 Snowflake 检索数据的高级设置。Advanced settings used to retrieve data from Snowflake. 可以配置 COPY into 命令支持的此类设置。在调用相关语句时,数据工厂会传递此类设置。You can configure the ones supported by the COPY into command that Data Factory will pass through when you invoke the statement. No
*exportSettings 下: _*Under exportSettings: _
typetype 导出命令的类型,设置为“SnowflakeExportCopyCommand”。The type of export command, set to _*SnowflakeExportCopyCommand**. Yes
additionalCopyOptionsadditionalCopyOptions 其他复制选项,作为键值对的字典提供。Additional copy options, provided as a dictionary of key-value pairs. 示例:MAX_FILE_SIZE、OVERWRITE。Examples: MAX_FILE_SIZE, OVERWRITE. 有关详细信息,请参阅 Snowflake 复制选项For more information, see Snowflake Copy Options. No
additionalFormatOptionsadditionalFormatOptions 作为键值对的字典提供给 COPY 命令的其他文件格式选项。Additional file format options that are provided to COPY command as a dictionary of key-value pairs. 示例:DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。Examples: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. 有关详细信息,请参阅 Snowflake 格式类型选项For more information, see Snowflake Format Type Options. No

从 Snowflake 进行的直接复制Direct copy from Snowflake

如果接收器数据存储和格式符合此部分所述条件,则可使用复制活动将数据从 Snowflake 直接复制到接收器。If your sink data store and format meet the criteria described in this section, you can use the Copy activity to directly copy from Snowflake to sink. 数据工厂将检查设置,如果不符合以下条件,复制活动运行将会失败:Data Factory checks the settings and fails the Copy activity run if the following criteria is not met:

  • “接收器链接服务”是使用“共享访问签名”身份验证的 Azure Blob 存储The sink linked service is Azure Blob storage with shared access signature authentication.

  • 接收器数据格式为“Parquet”、“带分隔符的文本”或“JSON”,其配置如下 :The sink data format is of Parquet , delimited text , or JSON with the following configurations:

    • 对于“Parquet”格式,压缩编解码器为“无”、“Snappy”或或“Lzo”。For Parquet format, the compression codec is None , Snappy , or Lzo.
    • 对于“带分隔符的文本”格式:For delimited text format:
      • rowDelimiter\r\n 或任何单个字符。rowDelimiter is \r\n , or any single character.
      • compression 可为“无压缩”、 gzipbzip2deflatecompression can be no compression , gzip , bzip2 , or deflate.
      • encodingName 保留为默认值或设置为 utf-8encodingName is left as default or set to utf-8.
      • quoteChar 为双引号、单引号或空字符串(无引号字符) 。quoteChar is double quote , single quote , or empty string (no quote char).
    • 对于“JSON”格式,直接复制只支持以下情况:源 Snowflake 表或查询结果仅有一列且该列的数据类型是“VARIANT”、“OBJECT”或“ARRAY” 。For JSON format, direct copy only supports the case that source Snowflake table or query result only has single column and the data type of this column is VARIANT , OBJECT , or ARRAY.
      • compression 可为“无压缩”、 gzipbzip2deflatecompression can be no compression , gzip , bzip2 , or deflate.
      • encodingName 保留为默认值或设置为 utf-8encodingName is left as default or set to utf-8.
      • filePattern 在复制活动接收器中保留为默认值或设置为“setOfObjects”。filePattern in copy activity sink is left as default or set to setOfObjects.
  • 在复制活动源中,additionalColumns 未指定。In copy activity source, additionalColumns is not specified.

  • 列映射未指定。Column mapping is not specified.

示例:Example:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeSource",
                "sqlReaderQuery": "SELECT * FROM MYTABLE",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "additionalCopyOptions": {
                        "MAX_FILE_SIZE": "64000000",
                        "OVERWRITE": true
                    },
                    "additionalFormatOptions": {
                        "DATE_FORMAT": "'MM/DD/YYYY'"
                    }
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

从 Snowflake 进行暂存复制Staged copy from Snowflake

如果接收器数据存储或格式与上一部分所述的 Snowflake COPY 命令并非以本机方式兼容,请通过临时的 Azure Blob 存储实例启用内置暂存复制。When your sink data store or format is not natively compatible with the Snowflake COPY command, as mentioned in the last section, enable the built-in staged copy using an interim Azure Blob storage instance. 暂存复制功能也能提供更高的吞吐量。The staged copy feature also provides you better throughput. 数据工厂将数据从 Snowflake 导出到临时存储,然后将数据复制到接收器,最后从临时存储中清除临时数据。Data Factory exports data from Snowflake into staging storage, then copies the data to sink, and finally cleans up your temporary data from the staging storage. 若要详细了解如何通过暂存方式复制数据,请参阅暂存复制See Staged copy for details about copying data by using staging.

若要使用此功能,请创建一个引用 Azure 存储帐户作为临时暂存位置的 Azure Blob 存储链接服务To use this feature, create an Azure Blob storage linked service that refers to the Azure storage account as the interim staging. 然后,在复制活动中指定 enableStagingstagingSettings 属性。Then specify the enableStaging and stagingSettings properties in the Copy activity.

备注

暂存 Azure Blob 存储链接服务必须使用 Snowflake 的 COPY 命令所需的共享访问签名身份验证。The staging Azure Blob storage linked service must use shared access signature authentication, as required by the Snowflake COPY command.

示例:Example:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeSource",
                "sqlReaderQuery": "SELECT * FROM MyTable"
            },
            "sink": {
                "type": "<sink type>"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

以 Snowflake 作为接收器Snowflake as sink

Snowflake 连接器利用 Snowflake 的 COPY into [table] 命令实现最佳性能。Snowflake connector utilizes Snowflake’s COPY into [table] command to achieve the best performance. 它支持将数据写入 Azure 上的 Snowflake。It supports writing data to Snowflake on Azure.

如果 Snowflake 的 COPY 命令以本机方式支持源数据存储和格式,则可使用复制活动将数据从源直接复制到 Snowflake。If source data store and format are natively supported by Snowflake COPY command, you can use the Copy activity to directly copy from source to Snowflake. 有关详细信息,请参阅直接复制到 SnowflakeFor details, see Direct copy to Snowflake. 否则,请使用内置的暂存复制到 SnowflakeOtherwise, use built-in Staged copy to Snowflake.

若要将数据复制到 Snowflake,复制活动的“接收器”部分需要支持以下属性。To copy data to Snowflake, the following properties are supported in the Copy activity sink section.

属性Property 说明Description 必需Required
typetype 复制活动接收器的类型属性设置为 SnowflakeSink。The type property of the Copy activity sink, set to SnowflakeSink. Yes
preCopyScriptpreCopyScript 指定在每次运行中将数据写入到 Snowflake 之前要由复制活动运行的 SQL 查询。Specify a SQL query for the Copy activity to run before writing data into Snowflake in each run. 使用此属性清理预加载的数据。Use this property to clean up the preloaded data. No
importSettingsimportSettings 用于将数据写入 Snowflake 的高级设置。Advanced settings used to write data into Snowflake. 可以配置 COPY into 命令支持的此类设置。在调用相关语句时,数据工厂会传递此类设置。You can configure the ones supported by the COPY into command that Data Factory will pass through when you invoke the statement. No
*importSettings 下: _*Under importSettings: _
typetype 导入命令的类型,设置为“SnowflakeImportCopyCommand”。The type of import command, set to _*SnowflakeImportCopyCommand**. Yes
additionalCopyOptionsadditionalCopyOptions 其他复制选项,作为键值对的字典提供。Additional copy options, provided as a dictionary of key-value pairs. 示例:ON_ERROR、FORCE、LOAD_UNCERTAIN_FILES。Examples: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. 有关详细信息,请参阅 Snowflake 复制选项For more information, see Snowflake Copy Options. No
additionalFormatOptionsadditionalFormatOptions 提供给 COPY 命令的其他文件格式选项,作为键值对的字典提供。Additional file format options provided to the COPY command, provided as a dictionary of key-value pairs. 示例:DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。Examples: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. 有关详细信息,请参阅 Snowflake 格式类型选项For more information, see Snowflake Format Type Options. No

直接复制到 SnowflakeDirect copy to Snowflake

如果源数据存储和格式符合此部分所述条件,则可使用复制活动将数据从源直接复制到 Snowflake。If your source data store and format meet the criteria described in this section, you can use the Copy activity to directly copy from source to Snowflake. Azure 数据工厂将检查设置,如果不符合以下条件,复制活动运行将会失败:Azure Data Factory checks the settings and fails the Copy activity run if the following criteria is not met:

  • “源链接服务”是使用“共享访问签名”身份验证的 Azure Blob 存储The source linked service is Azure Blob storage with shared access signature authentication.

  • “源数据格式”为“Parquet”、“带分隔符的文本”或“JSON”,其配置如下 :The source data format is Parquet , Delimited text , or JSON with the following configurations:

    • 对于“Parquet”格式,压缩编解码器为“无”或“Snappy”。For Parquet format, the compression codec is None , or Snappy.

    • 对于“带分隔符的文本”格式:For delimited text format:

      • rowDelimiter\r\n 或任何单个字符。rowDelimiter is \r\n , or any single character. 如果行分隔符不是“\r\n”,则需将 firstRowAsHeader 设置为 false ,且不指定 skipLineCountIf row delimiter is not “\r\n”, firstRowAsHeader need to be false , and skipLineCount is not specified.
      • compression 可为“无压缩”、 gzipbzip2deflatecompression can be no compression , gzip , bzip2 , or deflate.
      • encodingName 保留为默认值或设置为“UTF-8”、“UTF-16”、“UTF-16BE”、“UTF-32”、“UTF-32BE”、“BIG5”、“EUC-JP”、“EUC-KR”、“GB18030”、“ISO-2022-JP”、“ISO-2022-KR”、“ISO-8859-1”、“ISO-8859-2”、“ISO-8859-5”、“ISO-8859-6”、“ISO-8859-7”、“ISO-8859-8”、“ISO-8859-9”、“WINDOWS-1250”、“WINDOWS-1251”、“WINDOWS-1252”、“WINDOWS-1253”、“WINDOWS-1254”、“WINDOWS-1255”。encodingName is left as default or set to "UTF-8", "UTF-16", "UTF-16BE", "UTF-32", "UTF-32BE", "BIG5", "EUC-JP", "EUC-KR", "GB18030", "ISO-2022-JP", "ISO-2022-KR", "ISO-8859-1", "ISO-8859-2", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "WINDOWS-1250", "WINDOWS-1251", "WINDOWS-1252", "WINDOWS-1253", "WINDOWS-1254", "WINDOWS-1255".
      • quoteChar 为双引号、单引号或空字符串(无引号字符) 。quoteChar is double quote , single quote , or empty string (no quote char).
    • 对于“JSON”格式,直接复制只支持以下情况:接收器 Snowflake 表仅有一列且该列的数据类型是“VARIANT”、“OBJECT”或“ARRAY” 。For JSON format, direct copy only supports the case that sink Snowflake table only has single column and the data type of this column is VARIANT , OBJECT , or ARRAY.

      • compression 可为“无压缩”、 gzipbzip2deflatecompression can be no compression , gzip , bzip2 , or deflate.
      • encodingName 保留为默认值或设置为 utf-8encodingName is left as default or set to utf-8.
      • 列映射未指定。Column mapping is not specified.
  • 在复制活动源中:In the Copy activity source:

    • 未指定 additionalColumnsadditionalColumns is not specified.
    • 如果源为文件夹,则将 recursive 设置为 true。If your source is a folder, recursive is set to true.
    • 未指定 prefixmodifiedDateTimeStartmodifiedDateTimeEndenablePartitionDiscoveryprefix, modifiedDateTimeStart, modifiedDateTimeEnd, and enablePartitionDiscovery are not specified.

示例:Example:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeSink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "copyOptions": {
                        "FORCE": "TRUE",
                        "ON_ERROR": "SKIP_FILE",
                    },
                    "fileFormatOptions": {
                        "DATE_FORMAT": "YYYY-MM-DD",
                    }
                }
            }
        }
    }
]

暂存复制到 SnowflakeStaged copy to Snowflake

如果源数据存储或格式与上一部分所述的 Snowflake COPY 命令并非以本机方式兼容,请通过临时的 Azure Blob 存储实例启用内置暂存复制。When your source data store or format is not natively compatible with the Snowflake COPY command, as mentioned in the last section, enable the built-in staged copy using an interim Azure Blob storage instance. 暂存复制功能也能提供更高的吞吐量。The staged copy feature also provides you better throughput. 数据工厂会自动转换数据,以满足 Snowflake 的数据格式要求。Data Factory automatically converts the data to meet the data format requirements of Snowflake. 然后,它会调用 COPY 命令将数据载入 Snowflake。It then invokes the COPY command to load data into Snowflake. 最后,它会从 Blob 存储中清理临时数据。Finally, it cleans up your temporary data from the blob storage. 若要详细了解如何通过暂存方式复制数据,请参阅暂存复制See Staged copy for details about copying data using staging.

若要使用此功能,请创建一个引用 Azure 存储帐户作为临时暂存位置的 Azure Blob 存储链接服务To use this feature, create an Azure Blob storage linked service that refers to the Azure storage account as the interim staging. 然后,在复制活动中指定 enableStagingstagingSettings 属性。Then specify the enableStaging and stagingSettings properties in the Copy activity.

备注

暂存 Azure Blob 存储链接服务需要使用 Snowflake 的 COPY 命令所需的共享访问签名身份验证。The staging Azure Blob storage linked service need to use shared access signature authentication as required by the Snowflake COPY command.

示例:Example:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeSink"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

查找活动属性Lookup activity properties

有关属性的详细信息,请参阅查找活动For more information about the properties, see Lookup activity.

后续步骤Next steps

有关数据工厂中复制活动支持用作源和接收器的数据存储的列表,请参阅支持的数据存储和格式For a list of data stores supported as sources and sinks by Copy activity in Data Factory, see supported data stores and formats.