使用 Azure 数据工厂向/从 SQL Server 复制数据Copy data to and from SQL Server by using Azure Data Factory

适用于: Azure 数据工厂 Azure Synapse Analytics

本文概述了如何使用 Azure 数据工厂中的复制活动从/向 SQL Server 数据库复制数据。This article outlines how to use the copy activity in Azure Data Factory to copy data from and to a SQL Server database. 本文基于总体概述复制活动的复制活动概述一文。It builds on the copy activity overview article that presents a general overview of the copy activity.

支持的功能Supported capabilities

以下活动支持此 SQL Server 连接器:This SQL Server connector is supported for the following activities:

可以将数据从 SQL Server 数据库复制到任何支持的接收器数据存储。You can copy data from a SQL Server database to any supported sink data store. 或者,可将数据从任何支持的源数据存储复制到 SQL Server 数据库。Or, you can copy data from any supported source data store to a SQL Server database. 有关复制活动支持作为源或接收器的数据存储列表,请参阅支持的数据存储表。For a list of data stores that are supported as sources or sinks by the copy activity, see the Supported data stores table.

具体而言,此 SQL Server 连接器支持:Specifically, this SQL Server connector supports:

  • SQL Server 版本 2005 及更高版本。SQL Server version 2005 and above.
  • 使用 SQL 或 Windows 身份验证复制数据。Copying data by using SQL or Windows authentication.
  • 作为源,使用 SQL 查询或存储过程检索数据。As a source, retrieving data by using a SQL query or a stored procedure. 你还可以选择从 SQL Server 源进行并行复制,请参阅从 SQL 数据库进行并行复制部分获取详细信息。You can also choose to parallel copy from SQL Server source, see the Parallel copy from SQL database section for details.
  • 作为接收器,根据源架构自动创建目标表(如果不存在);在复制过程中,将数据追加到表或使用自定义逻辑调用存储过程。As a sink, automatically creating destination table if not exists based on the source schema; appending data to a table or invoking a stored procedure with custom logic during copy.

SQL Server Express LocalDB 不受支持。SQL Server Express LocalDB is not supported.

备注

此连接器目前不支持 SQL Server Always EncryptedSQL Server Always Encrypted isn't supported by this connector now. 为了解决此问题,可以使用通用 ODBC 连接器和 SQL Server ODBC 驱动程序。To work around, you can use a generic ODBC connector and a SQL Server ODBC driver. 按照此指南完成 ODBC 驱动程序下载和连接字符串配置。Follow this guidance with ODBC driver download and connection string configurations.

先决条件Prerequisites

如果数据存储位于本地网络、Azure 虚拟网络或 Amazon Virtual Private Cloud 内部,则需要配置自承载集成运行时才能连接到该数据存储。If your data store is located inside an on-premises network, an Azure virtual network, or Amazon Virtual Private Cloud, you need to configure a self-hosted integration runtime to connect to it.

另外,如果数据存储是托管的云数据服务,可以使用 Azure 集成运行时。Alternatively, if your data store is a managed cloud data service, you can use Azure integration runtime. 如果访问范围限制为防火墙规则中允许的 IP,你可以选择将 Azure Integration Runtime IP 添加到允许列表。If the access is restricted to IPs that are approved in the firewall rules, you can add Azure Integration Runtime IPs into the allow list.

要详细了解网络安全机制和数据工厂支持的选项,请参阅数据访问策略For more information about the network security mechanisms and options supported by Data Factory, see Data access strategies.

入门Get started

若要使用管道执行复制活动,可以使用以下工具或 SDK 之一:To perform the Copy activity with a pipeline, you can use one of the following tools or SDKs:

对于特定于 SQL Server 数据库连接器的数据工厂实体,以下部分提供有关用于定义这些实体的属性的详细信息。The following sections provide details about properties that are used to define Data Factory entities specific to the SQL Server database connector.

链接服务属性Linked service properties

SQL Server 链接服务支持以下属性:The following properties are supported for the SQL Server linked service:

属性Property 说明Description 必须Required
typetype type 属性必须设置为 SqlServerThe type property must be set to SqlServer. Yes
connectionStringconnectionString 指定使用 SQL 身份验证或 Windows 身份验证连接到 SQL Server 数据库时所需的 connectionString 信息。Specify connectionString information that's needed to connect to the SQL Server database by using either SQL authentication or Windows authentication. 请参阅以下示例。Refer to the following samples.
还可以在 Azure Key Vault 中输入密码。You also can put a password in Azure Key Vault. 如果使用 SQL 身份验证,请从连接字符串中提取 password 配置。If it's SQL authentication, pull the password configuration out of the connection string. 有关详细信息,请参阅表格后面的 JSON 示例,以及在 Azure Key Vault 中存储凭据For more information, see the JSON example following the table and Store credentials in Azure Key Vault.
Yes
userNameuserName 如果使用 Windows 身份验证,请指定用户名。Specify a user name if you use Windows authentication. 例如,domainname\usernameAn example is domainname\username. No
passwordpassword 指定为用户名指定的用户帐户的密码。Specify a password for the user account you specified for the user name. 将此字段标记为 SecureString,以便安全地将其存储在 Azure 数据工厂中。Mark this field as SecureString to store it securely in Azure Data Factory. 或者,可以引用 Azure Key Vault 中存储的机密Or, you can reference a secret stored in Azure Key Vault. No
connectViaconnectVia 集成运行时用于连接到数据存储。This integration runtime is used to connect to the data store. 先决条件部分了解更多信息。Learn more from Prerequisites section. 如果未指定,则使用默认 Azure Integration Runtime。If not specified, the default Azure integration runtime is used. No

提示

如果遇到错误(错误代码为“UserErrorFailedToConnectToSqlServer”,且消息如“数据库的会话限制为 XXX 且已达到。”),请将 Pooling=false 添加到连接字符串中,然后重试。If you hit an error with the error code "UserErrorFailedToConnectToSqlServer" and a message like "The session limit for the database is XXX and has been reached," add Pooling=false to your connection string and try again.

示例 1:使用 SQL 身份验证Example 1: Use SQL authentication

{
    "name": "SqlServerLinkedService",
    "properties": {
        "type": "SqlServer",
        "typeProperties": {
            "connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=False;User ID=<username>;Password=<password>;"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

示例 2:将 SQL 身份验证与 Azure Key Vault 中的密码结合使用Example 2: Use SQL authentication with a password in Azure Key Vault

{
    "name": "SqlServerLinkedService",
    "properties": {
        "type": "SqlServer",
        "typeProperties": {
            "connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=False;User ID=<username>;",
            "password": { 
                "type": "AzureKeyVaultSecret", 
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>", 
                    "type": "LinkedServiceReference" 
                }, 
                "secretName": "<secretName>" 
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

示例 3:使用 Windows 身份验证Example 3: Use Windows authentication

{
    "name": "SqlServerLinkedService",
    "properties": {
        "type": "SqlServer",
        "typeProperties": {
            "connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=True;",
            "userName": "<domain\\username>",
            "password": {
                "type": "SecureString",
                "value": "<password>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
     }
}

数据集属性Dataset properties

有关可用于定义数据集的各部分和属性的完整列表,请参阅数据集一文。For a full list of sections and properties available for defining datasets, see the datasets article. 本部分提供 SQL Server 数据集支持的属性列表。This section provides a list of properties supported by the SQL Server dataset.

从/向 SQL Server 数据库复制数据时支持以下属性:To copy data from and to a SQL Server database, the following properties are supported:

属性Property 说明Description 必须Required
typetype 数据集的 type 属性必须设置为 SqlServerTable。The type property of the dataset must be set to SqlServerTable. Yes
架构schema 架构的名称。Name of the schema. 对于源为“No”,对于接收器为“Yes”No for source, Yes for sink
table 表/视图的名称。Name of the table/view. 对于源为“No”,对于接收器为“Yes”No for source, Yes for sink
tableNametableName 具有架构的表/视图的名称。Name of the table/view with schema. 此属性支持后向兼容性。This property is supported for backward compatibility. 对于新的工作负荷,请使用 schematableFor new workload, use schema and table. 对于源为“No”,对于接收器为“Yes”No for source, Yes for sink

示例Example

{
    "name": "SQLServerDataset",
    "properties":
    {
        "type": "SqlServerTable",
        "linkedServiceName": {
            "referenceName": "<SQL Server linked service name>",
            "type": "LinkedServiceReference"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "typeProperties": {
            "schema": "<schema_name>",
            "table": "<table_name>"
        }
    }
}

复制活动属性Copy activity properties

有关可用于定义活动的节和属性的完整列表,请参阅管道一文。For a full list of sections and properties available for use to define activities, see the Pipelines article. 本部分提供 SQL Server 源和接收器支持的属性列表。This section provides a list of properties supported by the SQL Server source and sink.

SQL Server 作为源SQL Server as a source

提示

若要详细了解如何使用数据分区从 SQL Server 高效加载数据,请参阅从 SQL 数据库进行并行复制To load data from SQL Server efficiently by using data partitioning, learn more from Parallel copy from SQL database.

要从 SQL Server 复制数据,请将复制活动中的源类型设置为 SqlSource。To copy data from SQL Server, set the source type in the copy activity to SqlSource. 复制活动的 source 节支持以下属性:The following properties are supported in the copy activity source section:

属性Property 说明Description 必须Required
typetype 复制活动 source 节的 type 属性必须设置为 SqlSource。The type property of the copy activity source must be set to SqlSource. Yes
sqlReaderQuerysqlReaderQuery 使用自定义 SQL 查询读取数据。Use the custom SQL query to read data. 例如 select * from MyTableAn example is select * from MyTable. No
sqlReaderStoredProcedureNamesqlReaderStoredProcedureName 此属性是从源表读取数据的存储过程的名称。This property is the name of the stored procedure that reads data from the source table. 最后一个 SQL 语句必须是存储过程中的 SELECT 语句。The last SQL statement must be a SELECT statement in the stored procedure. No
storedProcedureParametersstoredProcedureParameters 这些参数用于存储过程。These parameters are for the stored procedure.
允许的值为名称或值对。Allowed values are name or value pairs. 参数的名称和大小写必须与存储过程参数的名称和大小写匹配。The names and casing of parameters must match the names and casing of the stored procedure parameters.
No
isolationLevelisolationLevel 指定 SQL 源的事务锁定行为。Specifies the transaction locking behavior for the SQL source. 允许的值为:ReadCommitted、ReadUncommitted、RepeatableRead、Serializable、Snapshot 。The allowed values are: ReadCommitted, ReadUncommitted, RepeatableRead, Serializable, Snapshot. 如果未指定,则使用数据库的默认隔离级别。If not specified, the database's default isolation level is used. 请参阅此文档了解更多详细信息。Refer to this doc for more details. No
partitionOptionspartitionOptions 指定用于从 SQL Server 加载数据的数据分区选项。Specifies the data partitioning options used to load data from SQL Server.
允许值包括:None(默认值)、PhysicalPartitionsOfTable 和 DynamicRange 。Allowed values are: None (default), PhysicalPartitionsOfTable, and DynamicRange.
启用分区选项(即,该选项不为 None)时,用于从 SQL Server 并行加载数据的并行度由复制活动上的 parallelCopies 设置控制。When a partition option is enabled (that is, not None), the degree of parallelism to concurrently load data from SQL Server is controlled by the parallelCopies setting on the copy activity.
No
partitionSettingspartitionSettings 指定数据分区的设置组。Specify the group of the settings for data partitioning.
当分区选项不是 None 时适用。Apply when the partition option isn't None.
No
partitionSettings 下: _**Under partitionSettings: _
partitionColumnNamepartitionColumnName 以整数类型、日期类型或日期/时间类型(intsmallintbigintdatesmalldatetimedatetimedatetime2datetimeoffset)指定源列的名称,范围分区将使用它进行并行复制。Specify the name of the source column _ in integer or date/datetime type* (int, smallint, bigint, date, smalldatetime, datetime, datetime2, or datetimeoffset) that will be used by range partitioning for parallel copy. 如果未指定,系统会自动检测表的索引或主键并将其用作分区列。If not specified, the index or the primary key of the table is auto-detected and used as the partition column.
当分区选项是 DynamicRange 时适用。Apply when the partition option is DynamicRange. 如果使用查询来检索源数据,请在 WHERE 子句中挂接 ?AdfDynamicRangePartitionCondition If you use a query to retrieve the source data, hook ?AdfDynamicRangePartitionCondition in the WHERE clause. 有关示例,请参阅从 SQL 数据库进行并行复制部分。For an example, see the Parallel copy from SQL database section.
No
partitionUpperBoundpartitionUpperBound 分区范围拆分的分区列的最大值。The maximum value of the partition column for partition range splitting. 此值用于决定分区步幅,不用于筛选表中的行。This value is used to decide the partition stride, not for filtering the rows in table. 将对表或查询结果中的所有行进行分区和复制。All rows in the table or query result will be partitioned and copied. 如果未指定,复制活动会自动检测该值。If not specified, copy activity auto detect the value.
当分区选项是 DynamicRange 时适用。Apply when the partition option is DynamicRange. 有关示例,请参阅从 SQL 数据库进行并行复制部分。For an example, see the Parallel copy from SQL database section.
No
partitionLowerBoundpartitionLowerBound 分区范围拆分的分区列的最小值。The minimum value of the partition column for partition range splitting. 此值用于决定分区步幅,不用于筛选表中的行。This value is used to decide the partition stride, not for filtering the rows in table. 将对表或查询结果中的所有行进行分区和复制。All rows in the table or query result will be partitioned and copied. 如果未指定,复制活动会自动检测该值。If not specified, copy activity auto detect the value.
当分区选项是 DynamicRange 时适用。Apply when the partition option is DynamicRange. 有关示例,请参阅从 SQL 数据库进行并行复制部分。For an example, see the Parallel copy from SQL database section.
No

请注意以下几点:Note the following points:

  • 如果为 SqlSource 指定 sqlReaderQuery,则复制活动针对 SQL Server 源运行此查询以获取数据。If sqlReaderQuery is specified for SqlSource, the copy activity runs this query against the SQL Server source to get the data. 也可通过指定 sqlReaderStoredProcedureName 和 storedProcedureParameters 来指定存储过程,前提是存储过程使用参数 。You also can specify a stored procedure by specifying sqlReaderStoredProcedureName and storedProcedureParameters if the stored procedure takes parameters.
  • 在源中使用存储过程检索数据时,请注意,如果存储过程旨在当传入不同的参数值时返回不同的架构,则从 UI 导入架构时,或通过自动创建表的功能将数据复制到 SQL 数据库时,可能会遇到故障或出现意外的结果。When using stored procedure in source to retrieve data, note if your stored procedure is designed as returning different schema when different parameter value is passed in, you may encounter failure or see unexpected result when importing schema from UI or when copying data to SQL database with auto table creation.

示例:使用 SQL 查询Example: Use SQL query

"activities":[
    {
        "name": "CopyFromSQLServer",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<SQL Server input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SqlSource",
                "sqlReaderQuery": "SELECT * FROM MyTable"
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

示例:使用存储过程Example: Use a stored procedure

"activities":[
    {
        "name": "CopyFromSQLServer",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<SQL Server input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SqlSource",
                "sqlReaderStoredProcedureName": "CopyTestSrcStoredProcedureWithParameters",
                "storedProcedureParameters": {
                    "stringData": { "value": "str3" },
                    "identifier": { "value": "$$Text.Format('{0:yyyy}', <datetime parameter>)", "type": "Int"}
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

存储过程定义The stored procedure definition

CREATE PROCEDURE CopyTestSrcStoredProcedureWithParameters
(
    @stringData varchar(20),
    @identifier int
)
AS
SET NOCOUNT ON;
BEGIN
    select *
    from dbo.UnitTestSrcTable
    where dbo.UnitTestSrcTable.stringData != stringData
    and dbo.UnitTestSrcTable.identifier != identifier
END
GO

SQL Server 作为接收器SQL Server as a sink

提示

若要详细了解支持的写入行为、配置和最佳做法,请参阅将数据加载到 SQL Server 中的最佳做法Learn more about the supported write behaviors, configurations, and best practices from Best practice for loading data into SQL Server.

要向 SQL Server 复制数据,请将复制活动中的接收器类型设置为 SqlSink。To copy data to SQL Server, set the sink type in the copy activity to SqlSink. 复制活动的 sink 节支持以下属性:The following properties are supported in the copy activity sink section:

属性Property 说明Description 必需Required
typetype 复制活动的 sink 的 type 属性必须设置为 SqlSink。The type property of the copy activity sink must be set to SqlSink. Yes
preCopyScriptpreCopyScript 此属性指定将数据写入到 SQL Server 中之前复制活动要运行的 SQL 查询。This property specifies a SQL query for the copy activity to run before writing data into SQL Server. 每次运行复制仅调用该查询一次。It's invoked only once per copy run. 可以使用此属性清除预加载的数据。You can use this property to clean up the preloaded data. No
tableOptiontableOption 指定是否根据源架构自动创建接收器表(如果不存在)。Specifies whether to automatically create the sink table if not exists based on the source schema. 接收器指定存储过程时不支持自动创建表。Auto table creation is not supported when sink specifies stored procedure. 允许的值为:none(默认值)、autoCreateAllowed values are: none (default), autoCreate. No
sqlWriterStoredProcedureNamesqlWriterStoredProcedureName 定义如何将源数据应用于目标表的存储过程的名称。The name of the stored procedure that defines how to apply source data into a target table.
此存储过程由每个批处理调用。This stored procedure is invoked per batch. 若要执行仅运行一次且与源数据无关的操作(例如删除或截断),请使用 preCopyScript 属性。For operations that run only once and have nothing to do with source data, for example, delete or truncate, use the preCopyScript property.
请参阅调用 SQL 接收器的存储过程中的示例。See example from Invoke a stored procedure from a SQL sink.
No
storedProcedureTableTypeParameterNamestoredProcedureTableTypeParameterName 存储过程中指定的表类型的参数名称。The parameter name of the table type specified in the stored procedure. No
sqlWriterTableTypesqlWriterTableType 要在存储过程中使用的表类型名称。The table type name to be used in the stored procedure. 通过复制活动,使移动数据在具备此表类型的临时表中可用。The copy activity makes the data being moved available in a temp table with this table type. 然后,存储过程代码可合并复制数据和现有数据。Stored procedure code can then merge the data that's being copied with existing data. No
storedProcedureParametersstoredProcedureParameters 存储过程的参数。Parameters for the stored procedure.
允许的值为名称和值对。Allowed values are name and value pairs. 参数的名称和大小写必须与存储过程参数的名称和大小写匹配。Names and casing of parameters must match the names and casing of the stored procedure parameters.
No
writeBatchSizewriteBatchSize 每批要插入到 SQL 表中的行数。Number of rows to insert into the SQL table per batch.
允许的值为表示行数的整数。Allowed values are integers for the number of rows. 默认情况下,Azure 数据工厂会根据行大小动态确定适当的批大小。By default, Azure Data Factory dynamically determines the appropriate batch size based on the row size.
No
writeBatchTimeoutwriteBatchTimeout 此属性指定超时前等待批插入操作完成的时间。This property specifies the wait time for the batch insert operation to complete before it times out.
允许的值是指时间跨度。Allowed values are for the timespan. 例如,“00:30:00”表示 30 分钟。An example is "00:30:00" for 30 minutes. 如果未指定值,则超时默认为“02:00:00”。If no value is specified, the timeout defaults to "02:00:00".
No

示例 1:追加数据Example 1: Append data

"activities":[
    {
        "name": "CopyToSQLServer",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<SQL Server output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SqlSink",
                "tableOption": "autoCreate",
                "writeBatchSize": 100000
            }
        }
    }
]

示例 2:在复制过程中调用存储过程Example 2: Invoke a stored procedure during copy

请参阅调用 SQL 接收器的存储过程,了解更多详细信息。Learn more details from Invoke a stored procedure from a SQL sink.

"activities":[
    {
        "name": "CopyToSQLServer",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<SQL Server output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SqlSink",
                "sqlWriterStoredProcedureName": "CopyTestStoredProcedureWithParameters",
                "storedProcedureTableTypeParameterName": "MyTable",
                "sqlWriterTableType": "MyTableType",
                "storedProcedureParameters": {
                    "identifier": { "value": "1", "type": "Int" },
                    "stringData": { "value": "str1" }
                }
            }
        }
    }
]

从 SQL 数据库进行并行复制Parallel copy from SQL database

复制活动中的 SQL Server 连接器提供内置的数据分区,用于并行复制数据。The SQL Server connector in copy activity provides built-in data partitioning to copy data in parallel. 可以在复制活动的“源”表中找到数据分区选项。 You can find data partitioning options on the Source tab of the copy activity.

分区选项的屏幕截图

启用分区复制时,复制活动将对 SQL Server 源运行并行查询,以按分区加载数据。When you enable partitioned copy, copy activity runs parallel queries against your SQL Server source to load data by partitions. 可通过复制活动中的 parallelCopies 设置控制并行度。The parallel degree is controlled by the parallelCopies setting on the copy activity. 例如,如果将 parallelCopies 设置为 4,则数据工厂会根据指定的分区选项和设置并行生成并运行 4 个查询,每个查询从 SQL Server 检索一部分数据。For example, if you set parallelCopies to four, Data Factory concurrently generates and runs four queries based on your specified partition option and settings, and each query retrieves a portion of data from your SQL Server.

建议同时启用并行复制和数据分区,尤其是从 SQL Server 加载大量数据时。You are suggested to enable parallel copy with data partitioning especially when you load large amount of data from your SQL Server. 下面是适用于不同方案的建议配置。The following are suggested configurations for different scenarios. 将数据复制到基于文件的数据存储中时,建议将数据作为多个文件写入文件夹(仅指定文件夹名称),在这种情况下,性能优于写入单个文件。When copying data into file-based data store, it's recommended to write to a folder as multiple files (only specify folder name), in which case the performance is better than writing to a single file.

方案Scenario 建议的设置Suggested settings
从包含物理分区的大型表进行完整加载。Full load from large table, with physical partitions. 分区选项:表的物理分区。Partition option: Physical partitions of table.

在执行期间,数据工厂将自动检测物理分区并按分区复制数据。During execution, Data Factory automatically detects the physical partitions, and copies data by partitions.

若要检查表是否有物理分区,可参考此查询To check if your table has physical partition or not, you can refer to this query.
从不包含物理分区但包含用于数据分区的整数或日期时间列的大型表进行完整加载。Full load from large table, without physical partitions, while with an integer or datetime column for data partitioning. 分区选项:动态范围分区。Partition options: Dynamic range partition.
分区列(可选):指定用于对数据进行分区的列。Partition column (optional): Specify the column used to partition data. 如果未指定,将使用索引或主键列。If not specified, the index or primary key column is used.
分区上限和分区下限(可选) :指定是否要确定分区步幅。Partition upper bound and partition lower bound (optional): Specify if you want to determine the partition stride. 这不适用于筛选表中的行,表中的所有行都将进行分区和复制。This is not for filtering the rows in table, all rows in the table will be partitioned and copied. 如果未指定,复制活动会自动检测这些值。If not specified, copy activity auto detect the values.

例如,如果分区列“ID”的值范围为 1 至 100,其下限设置为 20、上限设置为 80,并行复制设置为 4,则数据工厂会按 4 个分区检索数据,ID 范围分别为 <=20、[21, 50]、[51, 80] 和 >=81。For example, if your partition column "ID" has values range from 1 to 100, and you set the lower bound as 20 and the upper bound as 80, with parallel copy as 4, Data Factory retrieves data by 4 partitions - IDs in range <=20, [21, 50], [51, 80], and >=81, respectively.
使用自定义查询从不包含物理分区但包含用于数据分区的整数或日期/日期时间列的表加载大量数据。Load a large amount of data by using a custom query, without physical partitions, while with an integer or date/datetime column for data partitioning. 分区选项:动态范围分区。Partition options: Dynamic range partition.
查询SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>Query: SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>.
分区列:指定用于对数据进行分区的列。Partition column: Specify the column used to partition data.
分区上限和分区下限(可选) :指定是否要确定分区步幅。Partition upper bound and partition lower bound (optional): Specify if you want to determine the partition stride. 这不适用于筛选表中的行,查询结果中的所有行都将进行分区和复制。This is not for filtering the rows in table, all rows in the query result will be partitioned and copied. 如果未指定,复制活动会自动检测该值。If not specified, copy activity auto detect the value.

在执行期间,数据工厂会将 ?AdfRangePartitionColumnName 替换为每个分区的实际列名和值范围,并将其发送到 SQL Server。During execution, Data Factory replaces ?AdfRangePartitionColumnName with the actual column name and value ranges for each partition, and sends to SQL Server.
例如,如果分区列“ID”的值范围为 1 至 100,其下限设置为 20、上限设置为 80,并行复制设置为 4,则数据工厂会按 4 个分区检索数据,ID 范围分别为 <=20、[21, 50]、[51, 80] 和 >=81。For example, if your partition column "ID" has values range from 1 to 100, and you set the lower bound as 20 and the upper bound as 80, with parallel copy as 4, Data Factory retrieves data by 4 partitions- IDs in range <=20, [21, 50], [51, 80], and >=81, respectively.

下面是针对不同场景的更多示例查询:Here are more sample queries for different scenarios:
1.查询整个表:1. Query the whole table:
SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition
2.使用列选择和附加的 where 子句筛选器从表中查询:2. Query from a table with column selection and additional where-clause filters:
SELECT <column_list> FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>
3.使用子查询进行查询:3. Query with subqueries:
SELECT <column_list> FROM (<your_sub_query>) AS T WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>
4.在子查询中使用分区查询:4. Query with partition in subquery:
SELECT <column_list> FROM (SELECT <your_sub_query_column_list> FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition) AS T

使用分区选项加载数据的最佳做法:Best practices to load data with partition option:

  1. 选择独特的列作为分区列(如主键或唯一键),以避免数据倾斜。Choose distinctive column as partition column (like primary key or unique key) to avoid data skew.
  2. 如果表具有内置分区,请使用名为“表的物理分区”分区选项来提升性能。If the table has built-in partition, use partition option "Physical partitions of table" to get better performance.
  3. 如果使用 Azure Integration Runtime 复制数据,则可设置较大的“数据集成单元 (DIU)”(>4) 以利用更多计算资源。If you use Azure Integration Runtime to copy data, you can set larger "Data Integration Units (DIU)" (>4) to utilize more computing resource. 检查此处适用的方案。Check the applicable scenarios there.
  4. 复制并行度”可控制分区数量,将此数字设置得太大有时会损害性能,建议将此数字设置按以下公式计算的值:(DIU 或自承载 IR 节点数)*(2 到 4)。"Degree of copy parallelism" control the partition numbers, setting this number too large sometime hurts the performance, recommend setting this number as (DIU or number of Self-hosted IR nodes) * (2 to 4).

示例:从包含物理分区的大型表进行完整加载Example: full load from large table with physical partitions

"source": {
    "type": "SqlSource",
    "partitionOption": "PhysicalPartitionsOfTable"
}

示例:使用动态范围分区进行查询Example: query with dynamic range partition

"source": {
    "type": "SqlSource",
    "query": "SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>",
    "partitionOption": "DynamicRange",
    "partitionSettings": {
        "partitionColumnName": "<partition_column_name>",
        "partitionUpperBound": "<upper_value_of_partition_column (optional) to decide the partition stride, not as data filter>",
        "partitionLowerBound": "<lower_value_of_partition_column (optional) to decide the partition stride, not as data filter>"
    }
}

检查物理分区的示例查询Sample query to check physical partition

SELECT DISTINCT s.name AS SchemaName, t.name AS TableName, pf.name AS PartitionFunctionName, c.name AS ColumnName, iif(pf.name is null, 'no', 'yes') AS HasPartition
FROM sys.tables AS t
LEFT JOIN sys.objects AS o ON t.object_id = o.object_id
LEFT JOIN sys.schemas AS s ON o.schema_id = s.schema_id
LEFT JOIN sys.indexes AS i ON t.object_id = i.object_id 
LEFT JOIN sys.index_columns AS ic ON ic.partition_ordinal > 0 AND ic.index_id = i.index_id AND ic.object_id = t.object_id 
LEFT JOIN sys.columns AS c ON c.object_id = ic.object_id AND c.column_id = ic.column_id 
LEFT JOIN sys.partition_schemes ps ON i.data_space_id = ps.data_space_id 
LEFT JOIN sys.partition_functions pf ON pf.function_id = ps.function_id 
WHERE s.name='[your schema]' AND t.name = '[your table name]'

如果表具有物理分区,你可看到“HasPartition”为“是”,如下所示。If the table has physical partition, you would see "HasPartition" as "yes" like the following.

SQL 查询结果

将数据加载到 SQL Server 中的最佳做法Best practice for loading data into SQL Server

将数据复制到 SQL Server 中时,可能需要不同的写入行为:When you copy data into SQL Server, you might require different write behavior:

  • 追加:我的源数据只包含新记录。Append: My source data has only new records.
  • 更新插入:我的源数据包含插入和更新内容。Upsert: My source data has both inserts and updates.
  • 覆盖:我需要每次都重新加载整个维度表。Overwrite: I want to reload the entire dimension table each time.
  • 使用自定义逻辑进行写入:在将数据最终插入目标表之前,我需要额外的处理。Write with custom logic: I need extra processing before the final insertion into the destination table.

有关如何在 Azure 数据工厂中进行配置和最佳做法,请参阅相应的部分。See the respective sections for how to configure in Azure Data Factory and best practices.

追加数据Append data

追加数据是此 SQL Server 接收器连接器的默认行为。Appending data is the default behavior of this SQL Server sink connector. Azure 数据工厂执行批量插入,以有效地在表中写入数据。Azure Data Factory does a bulk insert to write to your table efficiently. 可以相应地在复制活动中配置源和接收器。You can configure the source and sink accordingly in the copy activity.

更新插入数据Upsert data

选项 1:当需要复制大量数据时,可以使用复制活动将所有记录大容量加载到一个临时表中,然后运行存储过程活动来一次性应用 MERGE 或 INSERT/UPDATE 语句。Option 1: When you have a large amount of data to copy, you can bulk load all records into a staging table by using the copy activity, then run a stored procedure activity to apply a MERGE or INSERT/UPDATE statement in one shot.

复制活动当前并非原生支持将数据加载到数据库临时表中。Copy activity currently doesn't natively support loading data into a database temporary table. 有一种结合多种活动进行设置的高级方法,请参阅优化 SQL 数据库批量更新插入方案There is an advanced way to set it up with a combination of multiple activities, refer to Optimize SQL Database Bulk Upsert scenarios. 下面显示了使用永久表作为暂存的示例。Below shows a sample of using a permanent table as staging.

例如,在 Azure 数据工厂中,可以使用 复制活动 创建一个管道,并将其与 存储过程活动 相链接。As an example, in Azure Data Factory, you can create a pipeline with a Copy activity chained with a Stored Procedure activity. 前者将数据从源存储复制到数据集中的 SQL Server 临时表(例如,表名为“UpsertStagingTable”的表)。The former copies data from your source store into a SQL Server staging table, for example, UpsertStagingTable, as the table name in the dataset. 然后,后者调用一个存储过程,以将临时表中的源数据合并到目标表中,并清理临时表。Then the latter invokes a stored procedure to merge source data from the staging table into the target table and clean up the staging table.

Upsert

在数据库中使用 MERGE 逻辑定义一个存储过程(如以下示例所示),以便从上述存储过程活动指向该过程。In your database, define a stored procedure with MERGE logic, like the following example, which is pointed to from the previous stored procedure activity. 假设目标是包含三个列的 Marketing 表:ProfileIDStateCategoryAssume that the target is the Marketing table with three columns: ProfileID, State, and Category. 根据 ProfileID 列执行更新插入。Do the upsert based on the ProfileID column.

CREATE PROCEDURE [dbo].[spMergeData]
AS
BEGIN
    MERGE TargetTable AS target
    USING UpsertStagingTable AS source
    ON (target.[ProfileID] = source.[ProfileID])
    WHEN MATCHED THEN
        UPDATE SET State = source.State
    WHEN NOT matched THEN
        INSERT ([ProfileID], [State], [Category])
      VALUES (source.ProfileID, source.State, source.Category);
    
    TRUNCATE TABLE UpsertStagingTable
END

选项 2:可选择在复制活动中调用存储过程Option 2: You can choose to invoke a stored procedure within the copy activity. 这种方法运行源表中的每个批(由 writeBatchSize 属性控制),而不是在复制活动中使用批量插入作为默认方法。This approach runs each batch (as governed by the writeBatchSize property) in the source table instead of using bulk insert as the default approach in the copy activity.

覆盖整个表Overwrite the entire table

可以在复制活动接收器中配置 preCopyScript 属性。You can configure the preCopyScript property in a copy activity sink. 在此情况下,对于运行的每个复制活动,Azure 数据工厂会先运行脚本。In this case, for each copy activity that runs, Azure Data Factory runs the script first. 然后,运行复制来插入数据。Then it runs the copy to insert the data. 例如,若要使用最新数据覆盖整个表,请指定一个脚本,以先删除所有记录,然后从源批量加载新数据。For example, to overwrite the entire table with the latest data, specify a script to first delete all the records before you bulk load the new data from the source.

使用自定义逻辑写入数据Write data with custom logic

使用自定义逻辑写入数据的步骤与更新插入数据部分中的描述类似。The steps to write data with custom logic are similar to those described in the Upsert data section. 如果在将源数据最终插入目标表之前需要应用额外的处理,则可先将数据加载到临时表,然后再调用存储过程活动,或者在复制活动接收器中调用存储过程来应用数据。When you need to apply extra processing before the final insertion of source data into the destination table, you can load to a staging table then invoke stored procedure activity, or invoke a stored procedure in copy activity sink to apply data.

调用 SQL 接收器的存储过程Invoke a stored procedure from a SQL sink

将数据复制到 SQL Server 数据库中时,还可以通过对每批源表使用附加参数来配置和调用用户指定的存储过程。When you copy data into SQL Server database, you also can configure and invoke a user-specified stored procedure with additional parameters on each batch of the source table. 存储过程功能利用表值参数The stored procedure feature takes advantage of table-valued parameters.

当内置复制机制无法使用时,还可使用存储过程。You can use a stored procedure when built-in copy mechanisms don't serve the purpose. 例如,在将源数据最终插入目标表之前应用额外的处理。An example is when you want to apply extra processing before the final insertion of source data into the destination table. 额外处理的示例包括合并列、查找其他值以及将数据插入多个表。Some extra processing examples are when you want to merge columns, look up additional values, and insert into more than one table.

以下示例演示如何使用存储过程,在 SQL Server 数据库中的表内执行 upsert。The following sample shows how to use a stored procedure to do an upsert into a table in the SQL Server database. 假设输入数据和接收器 Marketing 表各有三列:ProfileIDStateCategoryAssume that the input data and the sink Marketing table each have three columns: ProfileID, State, and Category. 基于 ProfileID 列执行更新插入,并仅将其应用于名为“ProductA”的特定类别。Do the upsert based on the ProfileID column, and only apply it for a specific category called "ProductA".

  1. 在数据库中,使用与 sqlWriterTableType 相同的名称定义表类型。In your database, define the table type with the same name as sqlWriterTableType. 表类型的架构与输入数据返回的架构相同。The schema of the table type is the same as the schema returned by your input data.

    CREATE TYPE [dbo].[MarketingType] AS TABLE(
        [ProfileID] [varchar](256) NOT NULL,
        [State] [varchar](256) NOT NULL,
        [Category] [varchar](256) NOT NULL
    )
    
  2. 在数据库中,使用与 sqlWriterStoredProcedureName 相同的名称定义存储过程。In your database, define the stored procedure with the same name as sqlWriterStoredProcedureName. 它可处理来自指定源的输入数据,并将其合并到输出表中。It handles input data from your specified source and merges into the output table. 存储过程中的表类型的参数名称与数据集中定义的 tableName 相同。The parameter name of the table type in the stored procedure is the same as tableName defined in the dataset.

    CREATE PROCEDURE spOverwriteMarketing @Marketing [dbo].[MarketingType] READONLY, @category varchar(256)
    AS
    BEGIN
    MERGE [dbo].[Marketing] AS target
    USING @Marketing AS source
    ON (target.ProfileID = source.ProfileID and target.Category = @category)
    WHEN MATCHED THEN
        UPDATE SET State = source.State
    WHEN NOT MATCHED THEN
        INSERT (ProfileID, State, Category)
        VALUES (source.ProfileID, source.State, source.Category);
    END
    
  3. 在 Azure 数据工厂中,在复制活动中定义 SQL sink 节,如下所示:In Azure Data Factory, define the SQL sink section in the copy activity as follows:

    "sink": {
        "type": "SqlSink",
        "sqlWriterStoredProcedureName": "spOverwriteMarketing",
        "storedProcedureTableTypeParameterName": "Marketing",
        "sqlWriterTableType": "MarketingType",
        "storedProcedureParameters": {
            "category": {
                "value": "ProductA"
            }
        }
    }
    

SQL Server 的数据类型映射Data type mapping for SQL Server

从/向 SQL Server 复制数据时,以下映射用于从 SQL Server 数据类型映射到 Azure 数据工厂临时数据类型。When you copy data from and to SQL Server, the following mappings are used from SQL Server data types to Azure Data Factory interim data types. 若要了解复制活动如何将源架构和数据类型映射到接收器,请参阅架构和数据类型映射To learn how the copy activity maps the source schema and data type to the sink, see Schema and data type mappings.

SQL Server 数据类型SQL Server data type Azure 数据工厂临时数据类型Azure Data Factory interim data type
bigintbigint Int64Int64
binarybinary Byte[]Byte[]
bitbit BooleanBoolean
charchar String, Char[]String, Char[]
datedate DateTimeDateTime
datetimeDatetime DateTimeDateTime
datetime2datetime2 DateTimeDateTime
DatetimeoffsetDatetimeoffset DateTimeOffsetDateTimeOffset
小数Decimal 小数Decimal
FILESTREAM attribute (varbinary(max))FILESTREAM attribute (varbinary(max)) Byte[]Byte[]
FloatFloat DoubleDouble
图像image Byte[]Byte[]
intint Int32Int32
moneymoney 小数Decimal
ncharnchar String, Char[]String, Char[]
ntextntext String, Char[]String, Char[]
numericnumeric 小数Decimal
nvarcharnvarchar String, Char[]String, Char[]
realreal SingleSingle
rowversionrowversion Byte[]Byte[]
smalldatetimesmalldatetime DateTimeDateTime
smallintsmallint Int16Int16
smallmoneysmallmoney 小数Decimal
sql_variantsql_variant ObjectObject
texttext String, Char[]String, Char[]
timetime TimeSpanTimeSpan
timestamptimestamp Byte[]Byte[]
tinyinttinyint Int16Int16
uniqueidentifieruniqueidentifier GuidGuid
varbinaryvarbinary Byte[]Byte[]
varcharvarchar String, Char[]String, Char[]
xmlxml StringString

备注

对于映射到十进制临时类型的数据类型,目前复制活动支持的最大精度为 28。For data types that map to the Decimal interim type, currently Copy activity supports precision up to 28. 如果数据需要的精度大于 28,请考虑在 SQL 查询中将其转换为字符串。If you have data that requires precision larger than 28, consider converting to a string in a SQL query.

Lookup 活动属性Lookup activity properties

若要了解有关属性的详细信息,请查看 Lookup 活动To learn details about the properties, check Lookup activity.

GetMetadata 活动属性GetMetadata activity properties

若要了解有关属性的详细信息,请查看 GetMetadata 活动To learn details about the properties, check GetMetadata activity

使用 Always EncryptedUsing Always Encrypted

使用 Always Encrypted 从/向 SQL Server 复制数据时,请通过自承载 Integration Runtime 使用通用 ODBC 连接器和 SQL Server ODBC 驱动程序。When you copy data from/to SQL Server with Always Encrypted, use generic ODBC connector and SQL Server ODBC driver via Self-hosted Integration Runtime. 此 SQL Server 连接器目前不支持 Always Encrypted。This SQL Server connector does not support Always Encrypted now.

更具体地说:More specifically:

  1. 安装自承载 Integration Runtime(如果没有)。Set up a Self-hosted Integration Runtime if you don't have one. 有关详细信息,请参阅自承载集成运行时一文。See Self-hosted Integration Runtime article for details.

  2. 此处下载适用于 SQL Server 的 64 位 ODBC 驱动程序,并将其安装在 Integration Runtime 计算机上。Download the 64-bit ODBC driver for SQL Server from here, and install on the Integration Runtime machine. 若要详细了解此驱动程序的工作原理,请参阅在适用于 SQL Server 的 ODBC 驱动程序中使用 Always EncryptedLearn more about how this driver works from Using Always Encrypted with the ODBC Driver for SQL Server.

  3. 创建 ODBC 类型的链接服务以连接到 SQL 数据库。Create linked service with ODBC type to connect to your SQL database. 若要使用 SQL 身份验证,请按如下所示指定 ODBC 连接字符串,并选择“基本”身份验证以设置用户名和密码。To use SQL authentication, specify the ODBC connection string as below, and select Basic authentication to set the user name and password.

    Driver={ODBC Driver 17 for SQL Server};Server=<serverName>;Database=<databaseName>;ColumnEncryption=Enabled;KeyStoreAuthentication=KeyVaultClientSecret;KeyStorePrincipalId=<servicePrincipalKey>;KeyStoreSecret=<servicePrincipalKey>
    
  4. 相应地使用 ODBC 类型创建数据集和复制活动。Create dataset and copy activity with ODBC type accordingly. 若要了解详细信息,请参阅 ODBC 连接器一文。Learn more from ODBC connector article.

排查连接问题Troubleshoot connection issues

  1. 将 SQL Server 实例配置为接受远程连接。Configure your SQL Server instance to accept remote connections. 启动“SQL Server Management Studio”,右键单击“服务器”并选择“属性” 。Start SQL Server Management Studio, right-click server, and select Properties. 从列表中选择“连接”,并选中“允许远程连接到此服务器”复选框 。Select Connections from the list, and select the Allow remote connections to this server check box.

    启用远程连接

    有关详细步骤,请参阅配置远程访问服务器配置选项For detailed steps, see Configure the remote access server configuration option.

  2. 启动“SQL Server 配置管理器”。Start SQL Server Configuration Manager. 针对所需实例展开“SQL Server 网络配置”,并选择“MSSQLSERVER 的协议” 。Expand SQL Server Network Configuration for the instance you want, and select Protocols for MSSQLSERVER. 协议将显示在右窗格中。Protocols appear in the right pane. 右键单击“TCP/IP”并选择“启用”以启用 TCP/IP 。Enable TCP/IP by right-clicking TCP/IP and selecting Enable.

    启用 TCP/IP

    有关详细信息和启用 TCP/IP 协议的其他方法,请参阅启用或禁用服务器网络协议For more information and alternate ways of enabling TCP/IP protocol, see Enable or disable a server network protocol.

  3. 在同一窗口中,双击“TCP/IP”以启动“TCP/IP 属性”窗口 。In the same window, double-click TCP/IP to launch the TCP/IP Properties window.

  4. 切换到“IP 地址”选项卡。向下滚动到“IPAll”部分。Switch to the IP Addresses tab. Scroll down to see the IPAll section. 记下“TCP 端口”的值。Write down the TCP Port. 默认值为 1433The default is 1433.

  5. 在计算机上创建 Windows 防火墙规则,以便允许通过此端口传入流量。Create a rule for the Windows Firewall on the machine to allow incoming traffic through this port.

  6. 验证连接:若要使用完全限定名称连接到 SQL Server,请从另一台计算机使用 SQL Server Management Studio。Verify connection: To connect to SQL Server by using a fully qualified name, use SQL Server Management Studio from a different machine. 例如 "<machine>.<domain>.corp.<company>.com,1433"An example is "<machine>.<domain>.corp.<company>.com,1433".

后续步骤Next steps

有关 Azure 数据工厂中复制活动支持作为源和接收器的数据存储的列表,请参阅支持的数据存储For a list of data stores supported as sources and sinks by the copy activity in Azure Data Factory, see Supported data stores.