复制数据并发送有关成功和失败的电子邮件通知

适用于: Azure 数据工厂 Azure Synapse Analytics

提示

试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用

在本教程中,我们将创建一个数据工厂管道来展示某些控制流功能。 此管道执行从 Azure Blob 存储容器中某个容器到同一存储帐户中另一个容器的简单复制。 如果复制活动成功,该管道会在告知成功结果的电子邮件中发送成功复制操作的详细信息(例如写入的数据量)。 如果复制活动失败,该管道会在告知失败结果的电子邮件中发送复制失败的详细信息(例如错误消息)。 整个教程讲解了如何传递参数。

方案的综合概述:Diagram shows Azure Blob Storage, which is the target of a copy, which, on success, sends an email with details or, on failure, sends an email with error details.

在本教程中执行以下步骤:

  • 创建数据工厂。
  • 创建 Azure 存储链接服务。
  • 创建 Azure Blob 数据集
  • 创建包含“复制”活动和“Web”活动的管道
  • 将活动的输出发送到后续活动
  • 利用参数传递和系统变量
  • 启动管道运行
  • 监视管道和活动运行

本教程使用 Azure 门户。 可以使用其他机制来与 Azure 数据工厂交互,具体请参阅目录中的“快速入门”。

先决条件

  • Azure 订阅。 如果没有 Azure 订阅,请在开始前创建一个试用帐户
  • Azure 存储帐户。 可将 Blob 存储用作数据存储。 如果没有 Azure 存储帐户,请参阅创建存储帐户一文获取创建步骤。
  • Azure SQL 数据库。 将数据库用作接收器数据存储。 如果没有 Azure SQL 数据库中的数据库,请参阅创建 Azure SQL 数据库中的数据库一文了解创建步骤。

创建 Blob 表

  1. 启动记事本。 复制以下文本并在磁盘上将其另存为 input.txt 文件。

    John,Doe
    Jane,Doe
    
  2. 使用 Azure 存储资源管理器等工具执行以下步骤:

    1. 创建 adfv2branch 容器。
    2. adfv2branch 容器中创建 input 文件夹。
    3. input.txt 文件上传到该容器。

创建电子邮件工作流终结点

若要从管道触发电子邮件发送,请使用 Azure 逻辑应用来定义工作流。 有关创建逻辑应用工作流的详细信息,请参阅创建示例消耗逻辑应用工作流

成功电子邮件工作流

创建名为“CopySuccessEmail”消耗逻辑应用工作流。 添加名为“收到 HTTP 请求时”的“请求”触发器,并添加名为“发送电子邮件”的 Office 365 Outlook 操作。 如果出现提示,请登录到 Office 365 Outlook 帐户。

Shows a screenshot of the Success email workflow.

在该“请求”触发器中,使用以下 JSON 填写请求正文 JSON 架构方框:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

工作流设计器中的“请求”触发器应如下图所示:

Shows a screenshot of the workflow designer with Request trigger.

对于“发送电子邮件”操作,请使用传入请求正文 JSON 架构的属性来自定义如何设置电子邮件的格式。 下面是一个示例:

Shows a screenshot of the workflow designer with the action named Send an email.

保存工作流。 记下成功电子邮件工作流的 HTTP Post 请求 URL:

//Success Request Url
https://prodxxx.chinaeast.logic.azure.cn:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

失败电子邮件工作流

遵循相同的步骤创建另一个名为“CopyFailEmail”的逻辑应用工作流。 在“请求”触发器中,请求正文 JSON 架构值是相同的。 更改电子邮件的格式(例如 Subject)即可定制失败电子邮件。 以下是示例:

Shows a screenshot of the workflow designer with the fail email workflow.

保存工作流。 记下失败电子邮件工作流的 HTTP Post 请求 URL:

//Fail Request Url
https://prodxxx.chinaeast.logic.azure.cn:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

现在,应有两个工作流 URL:

//Success Request Url
https://prodxxx.chinaeast.logic.azure.cn:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.chinaeast.logic.azure.cn:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

创建数据工厂

  1. 启动 Microsoft EdgeGoogle Chrome Web 浏览器。 目前,仅 Microsoft Edge 和 Google Chrome Web 浏览器支持数据工厂 UI。

  2. 展开左上角的菜单,选择“创建资源”。 然后选择 >“数据 + 分析”>“数据工厂”:

    Shows a screenshot of the Data Factory selection in the "New" pane.

  3. 在“新建数据工厂”页中,输入 ADFTutorialDataFactory 作为名称

    New data factory page

    Azure 数据工厂的名称必须 全局唯一。 如果收到错误,请更改数据工厂的名称(例如改为 yournameADFTutorialDataFactory),并重新尝试创建。 有关数据工厂项目命名规则,请参阅数据工厂 - 命名规则一文。

    数据工厂名“ADFTutorialDataFactory”不可用。

  4. 选择要在其中创建数据工厂的 Azure 订阅

  5. 对于资源组,请执行以下步骤之一:

    • 选择“使用现有资源组”,并从下拉列表选择现有的资源组。

    • 选择“新建”,并输入资源组的名称。

      若要了解有关资源组的详细信息,请参阅 使用资源组管理 Azure 资源

  6. 选择数据工厂的位置。 下拉列表中仅显示支持的位置。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。

  7. 选择“固定到仪表板”。

  8. 单击创建

  9. 创建完成后,可以看到图中所示的“数据工厂”页。

    Shows a screenshot of the data factory home page.

  10. 单击“打开 Azure 数据工厂工作室”磁贴,以便在单独的标签页中启动 Azure 数据工厂用户界面 (UI)。

创建管道

本步骤创建包含一个“复制”活动和两个“Web”活动的管道。 使用以下功能创建管道:

  • 数据集访问的管道参数。
  • 用于调用逻辑应用工作流发送成功/失败电子邮件的 Web 活动。
  • 将一个活动连接到另一个活动(成功和失败时)
  • 使用一个活动的输出作为后续活动的输入
  1. 在数据工厂 UI 的主页中,单击“协调”磁贴。

    Shows a screenshot of the data factory home page with the Orchestrate tile highlighted.

  2. 在管道的属性窗口中切换到“参数”选项卡,并使用“新建”按钮添加字符串类型的以下三个参数:sourceBlobContainer、sinkBlobContainer 和 receiver。

    • sourceBlobContainer – 源 Blob 数据集使用的管道中的参数。
    • sinkBlobContainer - 接收器 Blob 数据集使用的管道中的参数
    • receiver - 此参数由管道中的两个 Web 活动用来向其电子邮件地址已通过此参数指定的接收方发送成功或失败电子邮件。

    Shows a screenshot of the New pipeline menu.

  3. 在“活动”工具箱中搜索“复制”,将“复制”活动拖放到管道设计器图面。

    Shows a screenshot demonstrating how to drag and drop the copy activity onto the pipeline designer.

  4. 选择拖到管道设计器图面上的“复制”活动。 在底部“复制”活动的“属性”窗口中切换到“源”选项卡,然后单击“+ 新建”。 此步骤创建复制活动的源数据集。

    Screenshot that shows how to create a source dataset for the copy activity.

  5. 在“新建数据集”窗口中,依次选择顶部的“Azure”选项卡、“Azure Blob 存储”、“继续”。

    Shows a screenshot of the select Azure Blob Storage button.

  6. 在“选择格式”窗口中,选择“DelimitedText”,然后选择“继续”。

    Shows a screenshot of the "Select Format" window with the DelimitedText format highlighted.

  7. 此时会显示标题为“设置属性”的新选项卡。 将数据集的名称更改为 SourceBlobDataset。 选择“链接服务”下拉列表,然后选择“+新建”以创建源数据集的新链接服务。

    Shows a screenshot of the "Set properties" window for the dataset, with the "+New" button highlighted under the "Linked service" dropdown.**

  8. 此时会出现“新建链接服务”窗口,你可以在其中填写链接服务所需的属性。

    Shows a screenshot fo the dataset connection window with the new linked service button highlighted.

  9. 在“新建链接服务” 窗口中完成以下步骤:

    1. 输入 AzureStorageLinkedService 作为名称
    2. 选择自己 Azure 存储帐户作为存储帐户名称
    3. 单击“创建”。
  10. 在接下来显示的“设置属性”窗口中,选择“打开此数据集”以输入文件名的参数化值。

    Shows a screenshot of the dataset "Set properties" window with the "Open this dataset" link highlighted.

  11. 输入 @pipeline().parameters.sourceBlobContainer 作为文件夹,输入 emp.txt 作为文件名。

    Shows a screenshot of the source dataset settings.

  12. 切换回“管道”选项卡(或单击左侧树视图中的管道),然后选择设计器上的“复制”活动。 确认为“源数据集”选择了新数据集。

    Shows a screenshot of the source dataset.

  13. 在属性窗口中切换到“接收器”选项卡,针对“接收器数据集”单击“+ 新建”。 像创建源数据集一样,在此步骤中创建复制活动的接收器数据集。

    Shows a screenshot of the new sink dataset button

  14. 在“新建数据集”窗口中,选择“Azure Blob 存储”,单击“继续”,接着在“选择格式”窗口中再次选择“DelimitedText”,然后单击“继续”。

  15. 在数据集的“设置属性”页中,输入“SinkBlobDataset”作为“名称”,然后为 LinkedService 选择“AzureStorageLinkedService”。

  16. 展开属性页的“高级”部分,然后选择“打开此数据集”。

  17. 在数据集的“连接”选项卡上,编辑“文件路径”。 输入 @pipeline().parameters.sinkBlobContainer 作为文件夹,输入 @concat(pipeline().RunId, '.txt') 作为文件名。 该表达式使用当前管道运行的 ID 作为文件名。 有关支持的系统变量和表达式列表,请参阅系统变量表达式语言

    Shows a screenshot of the Sink dataset settings.

  18. 切换回顶部的“管道”选项卡。 在搜索框中搜索“Web”,并将“Web”活动拖放到管道设计器图面。 将活动的名称设置为 SendSuccessEmailActivity。 Web 活动允许调用任何 REST 终结点。 有关该活动的详细信息,请参阅 Web 活动。 此管道使用 Web 活动调用逻辑应用电子邮件工作流。

    Shows a screenshot demonstrating how to drag and drop the first Web activity.

  19. 从“常规”选项卡切换到“设置”选项卡,然后执行以下步骤:

    1. 对于“URL”,请指定用于发送成功电子邮件的逻辑应用工作流的 URL。

    2. 为“方法”选择“POST”。

    3. 在“标头”部分中单击“+ 添加标头”链接。

    4. 添加标头 Content-Type 并将其设置为 application/json

    5. 为“正文”指定以下 JSON。

      {
          "message": "@{activity('Copy1').output.dataWritten}",
          "dataFactoryName": "@{pipeline().DataFactory}",
          "pipelineName": "@{pipeline().Pipeline}",
          "receiver": "@pipeline().parameters.receiver"
      }
      

      消息正文包含以下属性:

      • 消息 - @{activity('Copy1').output.dataWritten 的传递值。 访问前一复制活动的属性,并传递 dataWritten 的值。 失败时,传递错误输出而不是 @{activity('CopyBlobtoBlob').error.message

      • 数据工厂名称 - @{pipeline().DataFactory} 的传递值。这是一个系统变量,用于访问相应的数据工厂名称。 有关系统变量的列表,请参阅系统变量一文。

      • 管道名称 - @{pipeline().Pipeline} 的传递值。 这也是系统变量,用于访问相应的管道名称。

      • 接收方 -“@pipeline().parameters.receiver”的传递值。 访问管道参数。

        Shows a screenshot of the settings for the first Web activity.

  20. 将“复制”活动旁边的绿色复选框按钮拖放到“Web”活动,以便将“复制”活动连接到“Web”活动。

    Shows a screenshot demonstrating how to connect the Copy activity with the first Web activity.

  21. 将“Web”活动从“活动”工具箱拖放到管道设计器图面,并将名称设置为 SendFailureEmailActivity

    Shows a screenshot of the name of the second Web activity.

  22. 切换到“设置”选项卡,然后执行以下步骤:

    1. 对于“URL”,请指定用于发送失败电子邮件的逻辑应用工作流的 URL。

    2. 为“方法”选择“POST”。

    3. 在“标头”部分中单击“+ 添加标头”链接。

    4. 添加标头 Content-Type 并将其设置为 application/json

    5. 为“正文”指定以下 JSON。

      {
          "message": "@{activity('Copy1').error.message}",
          "dataFactoryName": "@{pipeline().DataFactory}",
          "pipelineName": "@{pipeline().Pipeline}",
          "receiver": "@pipeline().parameters.receiver"
      }
      

      Shows a screenshot of the settings for the second Web activity.

  23. 选择管道设计器中“复制”活动右侧的红色“X”按钮,然后将其拖放到刚创建的 SendFailureEmailActivity 上。

    Screenshot that shows how to select Error on the Copy activity in the pipeline designer.

  24. 若要验证管道,请单击工具栏中的“验证”按钮。 单击 >> 按钮关闭“管道验证输出”窗口。

    Shows a screenshot of the Validate pipeline button.

  25. 若要将实体(数据集、管道等)发布到数据工厂服务,请选择“全部发布”。 等待“已成功发布”消息出现。

    Shows a screenshot of the Publish button in the data factory portal.

触发成功的管道运行

  1. 若要触发某个管道运行,请在工具栏中单击“触发器”,然后单击“立即触发”。

    Shows a screenshot of the Trigger Now button.

  2. 在“管道运行”窗口中执行以下步骤:

    1. sourceBlobContainer 参数输入 adftutorial/adfv2branch/input

    2. sinkBlobContainer 参数输入 adftutorial/adfv2branch/output

    3. 输入接收方电子邮件地址

    4. 单击“完成”

      Pipeline run parameters

监视成功的管道运行

  1. 若要监视管道运行,请切换到左侧的“监视”选项卡。 可以看到手动触发的管道运行。 使用“刷新”按钮刷新列表。

    Successful pipeline run

  2. 若要查看与此管道运行关联的活动运行,请单击“操作”列中的第一个链接。 单击顶部的“管道”可以切换回到上一视图。 使用“刷新”按钮刷新列表。

    Screenshot that shows how to view the list of activity runs.

触发失败的管道运行

  1. 在左侧切换到“编辑”选项卡。

  2. 若要触发某个管道运行,请在工具栏中单击“触发器”,然后单击“立即触发”。

  3. 在“管道运行”窗口中执行以下步骤:

    1. sourceBlobContainer 参数输入 adftutorial/dummy/input。 请确保 adftutorial 容器中不存在 dummy 文件夹。
    2. sinkBlobContainer 参数输入 adftutorial/dummy/output
    3. 输入接收方电子邮件地址
    4. 单击“完成”。

监视失败的管道运行

  1. 若要监视管道运行,请切换到左侧的“监视”选项卡。 可以看到手动触发的管道运行。 使用“刷新”按钮刷新列表。

    Failure pipeline run

  2. 单击管道运行对应的“错误”链接可查看有关错误的详细信息。

    Pipeline error

  3. 若要查看与此管道运行关联的活动运行,请单击“操作”列中的第一个链接。 使用“刷新”按钮刷新列表。 请注意,管道中的“复制”活动失败。 “Web”活动已成功将失败电子邮件发送到指定的接收方。

    Activity runs

  4. 在“操作”列中单击“错误”链接可查看有关错误的详细信息。

    Activity run error

已在本教程中执行了以下步骤:

  • 创建数据工厂。
  • 创建 Azure 存储链接服务。
  • 创建 Azure Blob 数据集
  • 创建包含复制活动和 Web 活动的管道
  • 将活动的输出发送到后续活动
  • 利用参数传递和系统变量
  • 启动管道运行
  • 监视管道和活动运行

现在可以转到“概念”部分详细了解 Azure 数据工厂。