将数据从 Amazon S3 迁移到 Azure Data Lake Storage Gen2Migrate data from Amazon S3 to Azure Data Lake Storage Gen2

适用于:是 Azure 数据工厂否 Azure Synapse Analytics(预览版)APPLIES TO: yesAzure Data Factory noAzure Synapse Analytics (Preview)

使用模板将数亿个文件构成的 PB 量级的数据从 Amazon S3 迁移到 Azure Data Lake Storage Gen2。Use the templates to migrate petabytes of data consisting of hundreds of millions of files from Amazon S3 to Azure Data Lake Storage Gen2.

备注

若要将少量数据(例如,少于 10 TB)从 AWS S3 复制到 Azure,更简单高效的方法是使用 Azure 数据工厂复制数据工具If you want to copy small data volume from AWS S3 to Azure (for example, less than 10 TB), it's more efficient and easy to use the Azure Data Factory Copy Data tool. 本文中所述模板的作用不仅仅是迁移数据。The template that's described in this article is more than what you need.

关于解决方案模板About the solution templates

建议使用数据分区,尤其是在迁移 10 TB 以上的数据时。Data partition is recommended especially when migrating more than 10 TB of data. 若要将数据分区,请利用“前缀”设置按名称筛选 Amazon S3 中的文件夹和文件,然后,每个 ADF 复制作业一次可以复制一个分区。To partition the data, leverage the ‘prefix’ setting to filter the folders and files on Amazon S3 by name, and then each ADF copy job can copy one partition at a time. 可以并行运行多个 ADF 复制作业,以获得更好的吞吐量。You can run multiple ADF copy jobs concurrently for better throughput.

数据迁移通常需要进行一次性的历史数据迁移,并定期将 AWS S3 中的更改同步到 Azure。Data migration normally requires one-time historical data migration plus periodically synchronizing the changes from AWS S3 to Azure. 下面提供了两个模板,其中一个模板执行一次性的历史数据迁移,另一个模板将 AWS S3 中的更改同步到 Azure。There are two templates below, where one template covers one-time historical data migration and another template covers synchronizing the changes from AWS S3 to Azure.

用于将历史数据从 Amazon S3 迁移到 Azure Data Lake Storage Gen2 的模板For the template to migrate historical data from Amazon S3 to Azure Data Lake Storage Gen2

此模板(模板名称:将历史数据从 AWS S3 迁移到 Azure Data Lake Storage Gen2)假设已在 Azure SQL 数据库中的某个外部控制表内编写了分区列表。 This template (template name: migrate historical data from AWS S3 to Azure Data Lake Storage Gen2) assumes that you have written a partition list in an external control table in Azure SQL Database. 因此,它将使用 Lookup 活动从外部控制表检索分区列表,遍历每个分区,并使每个 ADF 复制作业一次复制一个分区。So it will use a Lookup activity to retrieve the partition list from the external control table, iterate over each partition, and make each ADF copy job copy one partition at a time. 完成任一复制作业后,它将使用 Stored Procedure 活动来更新在控制表中复制每个分区的操作状态。Once any copy job completed, it uses Stored Procedure activity to update the status of copying each partition in control table.

该模板包含五个活动:The template contains five activities:

  • Lookup 从外部控制表中检索尚未复制到 Azure Data Lake Storage Gen2 的分区。Lookup retrieves the partitions which have not been copied to Azure Data Lake Storage Gen2 from an external control table. 表名称为 s3_partition_control_table,用于从表中加载数据的查询为 "SELECT PartitionPrefix FROM s3_partition_control_table WHERE SuccessOrFailure = 0"The table name is s3_partition_control_table and the query to load data from the table is "SELECT PartitionPrefix FROM s3_partition_control_table WHERE SuccessOrFailure = 0".
  • ForEach 获取 Lookup 活动检索到的分区列表,然后将每个分区迭代到 TriggerCopy 活动。ForEach gets the partition list from the Lookup activity and iterates each partition to the TriggerCopy activity. 可以设置 batchCount 以同时运行多个 ADF 复制作业。You can set the batchCount to run multiple ADF copy jobs concurrently. 我们在此模板中设置了 2。We have set 2 in this template.
  • ExecutePipeline 执行 CopyFolderPartitionFromS3 管道。ExecutePipeline executes CopyFolderPartitionFromS3 pipeline. 我们之所以创建另一个管道来使每个复制作业复制一个分区,是因为这样可以轻松地重新运行失败的复制作业,以便再次从 AWS S3 中重新加载该特定分区。The reason we create another pipeline to make each copy job copy a partition is because it will make you easy to rerun the failed copy job to reload that specific partition again from AWS S3. 加载其他分区的所有其他复制作业不受影响。All other copy jobs loading other partitions will not be impacted.
  • Copy 将 AWS S3 中的每个分区复制到 Azure Data Lake Storage Gen2。Copy copies each partition from AWS S3 to Azure Data Lake Storage Gen2.
  • SqlServerStoredProcedure 更新在控制表中复制每个分区的操作状态。SqlServerStoredProcedure update the status of copying each partition in control table.

该模板包含两个参数:The template contains two parameters:

  • AWS_S3_bucketName 是 AWS S3 上要从中迁移数据的桶名称。AWS_S3_bucketName is your bucket name on AWS S3 where you want to migrate data from. 若要从 AWS S3 上的多个桶迁移数据,可以在外部控制表中额外添加一个列用于存储每个分区的桶名称,并更新管道以相应地从该列检索数据。If you want to migrate data from multiple buckets on AWS S3, you can add one more column in your external control table to store the bucket name for each partition, and also update your pipeline to retrieve data from that column accordingly.
  • Azure_Storage_fileSystem 是 Azure Data Lake Storage Gen2 上要将数据迁移到的文件系统名称。Azure_Storage_fileSystem is your fileSystem name on Azure Data Lake Storage Gen2 where you want to migrate data to.

用于仅将已更改的文件从 Amazon S3 复制到 Azure Data Lake Storage Gen2 的模板For the template to copy changed files only from Amazon S3 to Azure Data Lake Storage Gen2

此模板(模板名称:将增量数据从 AWS S3 复制到 Azure Data Lake Storage Gen2)使用每个文件的 LastModifiedTime,仅将新的或已更新的文件从 AWS S3 复制到 Azure。 This template (template name: copy delta data from AWS S3 to Azure Data Lake Storage Gen2) uses LastModifiedTime of each file to copy the new or updated files only from AWS S3 to Azure. 请注意,如果 AWS S3 上的文件或文件夹已经过时间分区并在文件或文件夹名称中包含时间切片信息(例如 /yyyy/mm/dd/file.csv),可以转到此教程获取增量加载新文件的更高效方法。Be aware if your files or folders has already been time partitioned with timeslice information as part of the file or folder name on AWS S3 (for example, /yyyy/mm/dd/file.csv), you can go to this tutorial to get the more performant approach for incremental loading new files. 此模板假设已在 Azure SQL 数据库中的外部控制表内编写了分区列表。This template assumes that you have written a partition list in an external control table in Azure SQL Database. 因此,它将使用 Lookup 活动从外部控制表检索分区列表,遍历每个分区,并使每个 ADF 复制作业一次复制一个分区。So it will use a Lookup activity to retrieve the partition list from the external control table, iterate over each partition, and make each ADF copy job copy one partition at a time. 当每个复制作业开始从 AWS S3 复制文件时,它依赖于使用 LastModifiedTime 属性来识别并仅复制新的或已更新的文件。When each copy job starts to copy the files from AWS S3, it relies on LastModifiedTime property to identify and copy the new or updated files only. 完成任一复制作业后,它将使用 Stored Procedure 活动来更新在控制表中复制每个分区的操作状态。Once any copy job completed, it uses Stored Procedure activity to update the status of copying each partition in control table.

该模板包含七个活动:The template contains seven activities:

  • Lookup 从外部控制表检索分区。Lookup retrieves the partitions from an external control table. 表名称为 s3_partition_delta_control_table,用于从表中加载数据的查询为 "select distinct PartitionPrefix from s3_partition_delta_control_table"The table name is s3_partition_delta_control_table and the query to load data from the table is "select distinct PartitionPrefix from s3_partition_delta_control_table".
  • ForEach 获取 Lookup 活动检索到的分区列表,然后将每个分区迭代到 TriggerDeltaCopy 活动。ForEach gets the partition list from the Lookup activity and iterates each partition to the TriggerDeltaCopy activity. 可以设置 batchCount 以同时运行多个 ADF 复制作业。You can set the batchCount to run multiple ADF copy jobs concurrently. 我们在此模板中设置了 2。We have set 2 in this template.
  • ExecutePipeline 执行 DeltaCopyFolderPartitionFromS3 管道。ExecutePipeline executes DeltaCopyFolderPartitionFromS3 pipeline. 我们之所以创建另一个管道来使每个复制作业复制一个分区,是因为这样可以轻松地重新运行失败的复制作业,以便再次从 AWS S3 中重新加载该特定分区。The reason we create another pipeline to make each copy job copy a partition is because it will make you easy to rerun the failed copy job to reload that specific partition again from AWS S3. 加载其他分区的所有其他复制作业不受影响。All other copy jobs loading other partitions will not be impacted.
  • Lookup 从外部控制表检索上次复制作业的运行时间,以便可以通过 LastModifiedTime 识别新的或已更新的文件。Lookup retrieves the last copy job run time from the external control table so that the new or updated files can be identified via LastModifiedTime. 表名称为 s3_partition_delta_control_table,用于从表中加载数据的查询为 "select max(JobRunTime) as LastModifiedTime from s3_partition_delta_control_table where PartitionPrefix = '@{pipeline().parameters.prefixStr}' and SuccessOrFailure = 1"The table name is s3_partition_delta_control_table and the query to load data from the table is "select max(JobRunTime) as LastModifiedTime from s3_partition_delta_control_table where PartitionPrefix = '@{pipeline().parameters.prefixStr}' and SuccessOrFailure = 1".
  • Copy 仅将 AWS S3 中每个分区的新文件或已更改的文件复制到 Azure Data Lake Storage Gen2。Copy copies new or changed files only for each partition from AWS S3 to Azure Data Lake Storage Gen2. modifiedDatetimeStart 的属性设置为上次复制作业的运行时间。The property of modifiedDatetimeStart is set to the last copy job run time. modifiedDatetimeEnd 的属性设置为当前复制作业的运行时间。The property of modifiedDatetimeEnd is set to the current copy job run time. 请注意,该时间采用 UTC 时区。Be aware the time is applied to UTC time zone.
  • SqlServerStoredProcedure 更新在控制表中复制每个分区的操作状态以及复制运行时间(如果操作成功)。SqlServerStoredProcedure update the status of copying each partition and copy run time in control table when it succeeds. SuccessOrFailure 的列设置为 1。The column of SuccessOrFailure is set to 1.
  • SqlServerStoredProcedure 更新在控制表中复制每个分区的操作状态以及复制运行时间(如果操作失败)。SqlServerStoredProcedure update the status of copying each partition and copy run time in control table when it fails. SuccessOrFailure 的列设置为 0。The column of SuccessOrFailure is set to 0.

该模板包含两个参数:The template contains two parameters:

  • AWS_S3_bucketName 是 AWS S3 上要从中迁移数据的桶名称。AWS_S3_bucketName is your bucket name on AWS S3 where you want to migrate data from. 若要从 AWS S3 上的多个桶迁移数据,可以在外部控制表中额外添加一个列用于存储每个分区的桶名称,并更新管道以相应地从该列检索数据。If you want to migrate data from multiple buckets on AWS S3, you can add one more column in your external control table to store the bucket name for each partition, and also update your pipeline to retrieve data from that column accordingly.
  • Azure_Storage_fileSystem 是 Azure Data Lake Storage Gen2 上要将数据迁移到的文件系统名称。Azure_Storage_fileSystem is your fileSystem name on Azure Data Lake Storage Gen2 where you want to migrate data to.

如何使用这两个解决方案模板How to use these two solution templates

用于将历史数据从 Amazon S3 迁移到 Azure Data Lake Storage Gen2 的模板For the template to migrate historical data from Amazon S3 to Azure Data Lake Storage Gen2

  1. 在 Azure SQL 数据库中创建一个用于存储 AWS S3 分区列表的控制表。Create a control table in Azure SQL Database to store the partition list of AWS S3.

    备注

    表名称为 s3_partition_control_table。The table name is s3_partition_control_table. 控制表的架构为 PartitionPrefix 和 SuccessOrFailure,其中,PartitionPrefix 是 S3 中的前缀设置,用于按名称筛选 Amazon S3 中的文件夹和文件;SuccessOrFailure 是复制每个分区的操作状态:0 表示此分区尚未复制到 Azure,1 表示此分区已成功复制到 Azure。The schema of the control table is PartitionPrefix and SuccessOrFailure, where PartitionPrefix is the prefix setting in S3 to filter the folders and files in Amazon S3 by name, and SuccessOrFailure is the status of copying each partition: 0 means this partition has not been copied to Azure and 1 means this partition has been copied to Azure successfully. 控制表中定义了 5 个分区,复制每个分区的默认操作状态为 0。There are 5 partitions defined in control table and the default status of copying each partition is 0.

    CREATE TABLE [dbo].[s3_partition_control_table](
        [PartitionPrefix] [varchar](255) NULL,
        [SuccessOrFailure] [bit] NULL
    )
    
    INSERT INTO s3_partition_control_table (PartitionPrefix, SuccessOrFailure)
    VALUES
    ('a', 0),
    ('b', 0),
    ('c', 0),
    ('d', 0),
    ('e', 0);
    
  2. 在同一个 Azure SQL 数据库中为控制表创建一个存储过程。Create a Stored Procedure on the same Azure SQL Database for control table.

    备注

    该存储过程的名称为 sp_update_partition_success。The name of the Stored Procedure is sp_update_partition_success. 该存储过程将由 ADF 管道中的 SqlServerStoredProcedure 活动调用。It will be invoked by SqlServerStoredProcedure activity in your ADF pipeline.

    CREATE PROCEDURE [dbo].[sp_update_partition_success] @PartPrefix varchar(255)
    AS
    BEGIN
    
        UPDATE s3_partition_control_table
        SET [SuccessOrFailure] = 1 WHERE [PartitionPrefix] = @PartPrefix
    END
    GO
    
  3. 转到“将历史数据从 AWS S3 迁移到 Azure Data Lake Storage Gen2”模板。 Go to the Migrate historical data from AWS S3 to Azure Data Lake Storage Gen2 template. 输入与外部控制表的连接,并输入 AWS S3 作为数据源存储,输入 Azure Data Lake Storage Gen2 作为目标存储。Input the connections to your external control table, AWS S3 as the data source store and Azure Data Lake Storage Gen2 as the destination store. 请注意,外部控制表和存储过程引用同一连接。Be aware that the external control table and the stored procedure are reference to the same connection.

    创建新连接

  4. 选择“使用此模板” 。Select Use this template.

    使用此模板

  5. 将会看到已创建 2 个管道和 3 个数据集,如以下示例中所示:You see the 2 pipelines and 3 datasets were created, as shown in the following example:

    查看管道

  6. 选择“调试”,输入参数,然后选择“完成”。 Select Debug, enter the Parameters, and then select Finish.

    单击“调试”****

  7. 看到的结果类似于以下示例:You see results that are similar to the following example:

    查看结果

用于仅将已更改的文件从 Amazon S3 复制到 Azure Data Lake Storage Gen2 的模板For the template to copy changed files only from Amazon S3 to Azure Data Lake Storage Gen2

  1. 在 Azure SQL 数据库中创建一个用于存储 AWS S3 分区列表的控制表。Create a control table in Azure SQL Database to store the partition list of AWS S3.

    备注

    表名称为 s3_partition_delta_control_table。The table name is s3_partition_delta_control_table. 控制表的架构为 PartitionPrefix、JobRunTime SuccessOrFailure,其中,PartitionPrefix 是 S3 中的前缀设置,用于按名称筛选 Amazon S3 中的文件夹和文件;JobRunTime 是运行复制作业时的日期时间值;SuccessOrFailure 是复制每个分区的操作状态:0 表示此分区尚未复制到 Azure,1 表示此分区已成功复制到 Azure。The schema of the control table is PartitionPrefix, JobRunTime and SuccessOrFailure, where PartitionPrefix is the prefix setting in S3 to filter the folders and files in Amazon S3 by name, JobRunTime is the datetime value when copy jobs run, and SuccessOrFailure is the status of copying each partition: 0 means this partition has not been copied to Azure and 1 means this partition has been copied to Azure successfully. 控制表中定义了 5 个分区。There are 5 partitions defined in control table. JobRunTime 的默认值可以是启动一次性历史数据迁移时的时间。The default value for JobRunTime can be the time when one-time historical data migration starts. ADF 复制活动将复制 AWS S3 上的、在该时间后已修改过的文件。ADF copy activity will copy the files on AWS S3 which have been last modified after that time. 复制每个分区的默认操作状态为 1。The default status of copying each partition is 1.

    CREATE TABLE [dbo].[s3_partition_delta_control_table](
        [PartitionPrefix] [varchar](255) NULL,
        [JobRunTime] [datetime] NULL,
        [SuccessOrFailure] [bit] NULL
        )
    
    INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure)
    VALUES
    ('a','1/1/2019 12:00:00 AM',1),
    ('b','1/1/2019 12:00:00 AM',1),
    ('c','1/1/2019 12:00:00 AM',1),
    ('d','1/1/2019 12:00:00 AM',1),
    ('e','1/1/2019 12:00:00 AM',1);
    
  2. 在同一个 Azure SQL 数据库中为控制表创建一个存储过程。Create a Stored Procedure on the same Azure SQL Database for control table.

    备注

    该存储过程的名称为 sp_insert_partition_JobRunTime_success。The name of the Stored Procedure is sp_insert_partition_JobRunTime_success. 该存储过程将由 ADF 管道中的 SqlServerStoredProcedure 活动调用。It will be invoked by SqlServerStoredProcedure activity in your ADF pipeline.

        CREATE PROCEDURE [dbo].[sp_insert_partition_JobRunTime_success] @PartPrefix varchar(255), @JobRunTime datetime, @SuccessOrFailure bit
        AS
        BEGIN
            INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure)
            VALUES
            (@PartPrefix,@JobRunTime,@SuccessOrFailure)
        END
        GO
    
  3. 转到“将增量数据从 AWS S3 复制到 Azure Data Lake Storage Gen2”模板。 Go to the Copy delta data from AWS S3 to Azure Data Lake Storage Gen2 template. 输入与外部控制表的连接,并输入 AWS S3 作为数据源存储,输入 Azure Data Lake Storage Gen2 作为目标存储。Input the connections to your external control table, AWS S3 as the data source store and Azure Data Lake Storage Gen2 as the destination store. 请注意,外部控制表和存储过程引用同一连接。Be aware that the external control table and the stored procedure are reference to the same connection.

    创建新连接

  4. 选择“使用此模板” 。Select Use this template.

    使用此模板

  5. 将会看到已创建 2 个管道和 3 个数据集,如以下示例中所示:You see the 2 pipelines and 3 datasets were created, as shown in the following example:

    查看管道

  6. 选择“调试”,输入参数,然后选择“完成”。 Select Debug, enter the Parameters, and then select Finish.

    单击“调试”****

  7. 看到的结果类似于以下示例:You see results that are similar to the following example:

    查看结果

  8. 还可以通过查询 "select * from s3_partition_delta_control_table" 来检查控制表中的结果,将看到类似于以下示例的输出:You can also check the results from the control table by a query "select * from s3_partition_delta_control_table", you will see the output similar to the following example:

    查看结果

后续步骤Next steps