教程:使用 SynapseML 和 Azure AI 搜索为 Apache Spark 中的大数据编制索引

本 Azure AI 搜索教程介绍如何为从 Spark 群集加载的大型数据编制索引和查询。 设置一个 Jupyter Notebook 来执行以下操作:

  • 在 Apache Spark 会话中将各种表单(发票)加载到数据帧
  • 分析它们以确定其功能
  • 将生成的输出组合到表格数据结构中
  • 将输出写入 Azure AI 搜索中托管的搜索索引
  • 浏览和查询所创建的内容

尽管 Azure AI 搜索具有本机 AI 扩充,本教程显示如何访问 Azure AI 搜索之外的 AI 功能。 通过使用 SynapseML 而不是索引器或技能,你不会受到数据限制或与这些对象相关的其他约束。

先决条件

需要 synapseml 库和几个 Azure 资源。 如果可能,请为 Azure 资源使用相同的订阅和区域,并将所有内容放入一个资源组中,便于稍后清理。 以下链接用于门户安装。 示例数据是从公共站点导入的。

1 此链接是加载包的教程。

2 可以使用免费搜索层为示例数据编制索引,但如果数据量很大,请选择更高的层。 对于可计费层级,请在接下来的设置依赖项步骤中提供搜索 API 密钥

3 本教程使用 Azure AI 文档智能和 Azure AI 翻译。 在以下说明中,提供多服务密钥和区域。 同一个密钥适用于这两个服务。

4 在本教程中,Azure Databricks 提供 Spark 计算平台。 我们按照门户说明设置了工作区。

注意

上述所有 Azure 资源都支持 Azure 标识平台中的安全功能。 为简单起见,本教程假定使用从每个服务的 Azure 门户页复制的终结点和密钥进行基于密钥的身份验证。 如果在生产环境中实现此工作流,或与他人共享解决方案,请记住将硬编码密钥替换为集成安全性或加密密钥。

步骤 1:创建 Spark 群集和笔记本

在本部分中,创建群集、安装 synapseml 库并创建用于运行代码的笔记本。

  1. 在 Azure 门户中,找到你的 Azure Databricks 工作区并选择“启动工作区”。

  2. 在左侧菜单中,选择“计算”。

  3. 选择“创建计算”。

  4. 接受默认配置。 创建群集需要几分钟时间。

  5. 创建群集后安装 synapseml 库:

    1. 从群集页面顶部的选项卡中选择“库”。

    2. 选择“新安装”。

      “新安装”命令的屏幕截图。

    3. 选择“Maven”。

    4. 在坐标中,输入 com.microsoft.azure:synapseml_2.12:1.0.4

    5. 选择“安装” 。

  6. 在左侧菜单中,选择“创建”>“笔记本”。

    “创建笔记本”命令的屏幕截图。

  7. 为笔记本指定一个名称,选择 Python 作为默认语言,然后选择具有 synapseml 库的群集。

  8. 创建七个连续单元格。 将代码粘贴到每个单元格中。

    带有占位符单元格的笔记本屏幕截图。

步骤 2:设置依赖项

将以下代码粘贴到笔记本的第一个单元格中。

将占位符替换为每个资源的终结点和访问密钥。 提供新搜索索引的名称。 不需要进行其他修改,因此准备就绪后便可运行代码。

此代码导入多个包并设置对此工作流中使用的 Azure 资源的访问权限。

import os
from pyspark.sql.functions import udf, trim, split, explode, col, monotonically_increasing_id, lit
from pyspark.sql.types import StringType
from synapse.ml.core.spark import FluentAPI

cognitive_services_key = "placeholder-cognitive-services-multi-service-key"
cognitive_services_region = "placeholder-cognitive-services-region"

search_service = "placeholder-search-service-name"
search_key = "placeholder-search-service-api-key"
search_index = "placeholder-search-index-name"

步骤 3:将数据加载到 Spark 中

将以下代码粘贴到第二个单元格中。 无需修改,因此准备就绪后便可运行代码。

此代码从 Azure 存储帐户加载一些外部文件。 这些文件是各种发票,可读入数据帧。

def blob_to_url(blob):
    [prefix, postfix] = blob.split("@")
    container = prefix.split("/")[-1]
    split_postfix = postfix.split("/")
    account = split_postfix[0]
    filepath = "/".join(split_postfix[1:])
    return "https://{}/{}/{}".format(account, container, filepath)

df2 = (spark.read.format("binaryFile")
    .load("wasbs://ignite2021@mmlsparkdemo.blob.core.chinacloudapi.cn/form_subset/*")
    .select("path")
    .limit(10)
    .select(udf(blob_to_url, StringType())("path").alias("url"))
    .cache())

display(df2)

步骤 4:添加文档智能

将以下代码粘贴到第三个单元格中。 无需修改,因此准备就绪后便可运行代码。

此代码加载 AnalyzeInvoices 转换器,并传递对包含发票的数据帧的引用。 它调用 Azure Forms Analyzer 的预生成的发票模型

from synapse.ml.cognitive import AnalyzeInvoices

analyzed_df = (AnalyzeInvoices()
    .setSubscriptionKey(cognitive_services_key)
    .setLocation(cognitive_services_region)
    .setImageUrlCol("url")
    .setOutputCol("invoices")
    .setErrorCol("errors")
    .setConcurrency(5)
    .transform(df2)
    .cache())

display(analyzed_df)

此步骤的输出应如下个屏幕截图所示。 请注意表单分析如何打包成难处理的密集结构化列。 下一个转换通过将列解析为行和列来解决此问题。

AnalyzeInvoices 输出的屏幕截图。

步骤 5:重构文档智能输出

将以下代码粘贴到第四个单元格并运行。 无需进行任何修改。

此代码加载 FormOntologyLearner,这是一个分析文档智能转换器的输出并推断表格数据结构的转换器。 AnalyzeInvoices 的输出是动态的,具体因内容中检测到的特征而异。 此外,转换器将输出合并到单个列中。 由于输出是动态的且是合并的,因此很难在需要更多结构的下游转换中使用该输出。

FormOntologyLearner 通过查找可用于创建表格数据结构的模式扩展了 AnalyzeInvoices 转换器的实用性。 通过将输出组织成多列和多行,使内容可供在其他转换器(例如 AzureSearchWriter)中使用。

from synapse.ml.cognitive import FormOntologyLearner

itemized_df = (FormOntologyLearner()
    .setInputCol("invoices")
    .setOutputCol("extracted")
    .fit(analyzed_df)
    .transform(analyzed_df)
    .select("url", "extracted.*").select("*", explode(col("Items")).alias("Item"))
    .drop("Items").select("Item.*", "*").drop("Item"))

display(itemized_df)

请注意,此转换如何将嵌套字段重新广播到一个表中,从而启用接下来的两个转换。 为了简洁起见,此屏幕截图已剪裁。 如果使用自己的笔记本继续操作,你将有 19 列和 26 行。

FormOntologyLearner 输出的屏幕截图。

步骤 6:添加翻译

将以下代码粘贴到第五个单元格中。 无需修改,因此准备就绪后便可运行代码。

此代码加载 Translate,这是一个调用 Azure AI 服务中的 Azure AI 翻译服务的转换器。 “说明”一列的英文原文被机器翻译成各种语言。 所有输出都合并到“output.translations”数组中。

from synapse.ml.cognitive import Translate

translated_df = (Translate()
    .setSubscriptionKey(cognitive_services_key)
    .setLocation(cognitive_services_region)
    .setTextCol("Description")
    .setErrorCol("TranslationError")
    .setOutputCol("output")
    .setToLanguage(["zh-Hans", "fr", "ru", "cy"])
    .setConcurrency(5)
    .transform(itemized_df)
    .withColumn("Translations", col("output.translations")[0])
    .drop("output", "TranslationError")
    .cache())

display(translated_df)

提示

若要查看已翻译的字符串,请滚动到行的末尾。

表输出的屏幕截图,其中显示了 Translations 列。

步骤 7:使用 AzureSearchWriter 添加搜索索引

将以下代码粘贴到第六个单元格中,然后运行它。 无需进行任何修改。

此代码加载 AzureSearchWriter。 它使用表格数据集,并推断一个搜索索引架构,该架构为每个列定义一个字段。 翻译结构是一个数组,因此它在索引中被表述为一个复杂的集合,其中每个语言翻译都有子字段。 生成的索引具有文档键,并使用通过创建索引 REST API 创建的字段的默认值。

from synapse.ml.cognitive import *

(translated_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
    .withColumn("SearchAction", lit("upload"))
    .writeToAzureSearch(
        subscriptionKey=search_key,
        actionCol="SearchAction",
        serviceName=search_service,
        indexName=search_index,
        keyCol="DocID",
    ))

可以查看 Azure 门户中的搜索服务页面,以探索由 AzureSearchWriter 创建的索引定义。

注意

如果不能使用默认搜索索引,可以在 JSON 中提供外部自定义定义,将其 URI 作为字符串传递到“indexJson”属性中。 首先生成默认索引,以确保知道要指定哪些字段,然后生成自定义属性,例如在需要特定分析器时生成自定义属性。

步骤 8:查询索引

将以下代码粘贴到第七个单元格中,然后运行它。 无需进行任何修改,但你可能希望更改语法或尝试更多示例以进一步探索你的内容:

没有发出查询的转换器或模块。 此单元格是对搜索文档 REST API 的简单调用。

此特定示例搜索“door”一词 ("search": "door")。 它还返回匹配文档的“计数”,并只为结果选择“说明”和“翻译”字段的内容。 如果要查看字段的完整列表,请删除“select”参数。

import requests

url = "https://{}.search.azure.cn/indexes/{}/docs/search?api-version=2024-07-01".format(search_service, search_index)
requests.post(url, json={"search": "door", "count": "true", "select": "Description, Translations"}, headers={"api-key": search_key}).json()

以下屏幕截图显示示例脚本的单元格输出。

显示计数、搜索字符串和返回字段的查询结果屏幕截图。

清理资源

在自己的订阅中操作时,最好在项目结束时删除不再需要的资源。 持续运行资源可能会产生费用。 可以逐个删除资源,也可以删除资源组以删除整个资源集。

你可以在 Azure 门户中查找和管理资源,只需使用左侧导航窗格中的“所有资源”或“资源组”链接即可。

后续步骤

本教程介绍 SynapseML 中的 AzureSearchWriter转换器,这是一种在 Azure AI 搜索中创建和加载搜索索引的新方法。 此转换器采用结构化 JSON 作为输入。 FormOntologyLearner 可以为 SynapseML 中的文档智能转换器生成的输出提供必要的结构。

下一步,请查看其他 SynapseML 教程,这些教程可生成你想要通过 Azure AI 搜索来探索的转换内容: