Azure AI 服务通过现成的预生成可定制 API 和模型,帮助开发人员和组织快速创建智能、前沿、面向市场且负责任的应用程序。
使用 SynapseML 可以基于各种 Spark 数据源生成功能强大且高度可缩放的预测模型和分析模型。 Synapse Spark 提供内置的 SynapseML 库,包括 synapse.ml.services。
重要
从 2023 年 9 月 20 日开始,无法创建新的异常检测器资源。 异常检测器服务将于 2026 年 10 月 1 日停用。
本教程(在 Azure Synapse 中使用 Azure AI 服务的先决条件)指导你完成在 Synapse Analytics 中使用 Azure AI 服务之前需要执行的几个步骤。
Azure AI 服务是一套集 API、SDK 和服务的套件,开发人员可以使用它们向其应用程序添加智能功能。 即使开发人员没有直接的 AI 或数据科学技能或知识,AI 服务也能为他们提供帮助。 Azure AI 服务帮助开发人员创建可以看、听、说、理解甚至开始推理的应用程序。 Azure AI 服务中的服务目录可分为五大主要支柱类别:视觉、语音、语言、Web 搜索和决策。
- 描述:以用户可读的语言提供图像描述(Scala、Python)
- 分析(颜色、图像类型、人脸、成人/猥亵内容):分析图像的视觉特征(Scala、Python)
- OCR:读取图像中的文本(Scala、Python)
- 识别文本:读取图像中的文本(Scala、Python)
- 缩略图:从图像生成用户指定大小的缩略图(Scala、Python)
- 识别特定于领域的内容:识别特定于领域的内容(名人、地标)(Scala、Python)
- 标记:标识与输入图像相关的单词列表(Scala、Python)
- 语言检测:检测输入文本的语言(Scala、Python)
- 关键短语提取:识别输入文本中的关键论题(Scala、Python)
- 命名实体识别:识别输入文本中的已知实体和常规命名实体(Scala、Python)
- 情绪分析:返回介于 0 和 1 之间的分数,指示输入文本中的情绪(Scala、Python)
- 医疗保健实体提取:从文本中提取医疗实体和关系。 (Scala、Python)
- 翻译:翻译文本。 (Scala、Python)
- 音译:将一种语言的文本从一个脚本转换为另一个脚本。 (Scala、Python)
- 检测:识别一段文本的语言。 (Scala、Python)
- 断句:识别文本段中的句子边界的位置。 (Scala、Python)
- 字典查找:为某个单词和少量的惯用语提供替代翻译。 (Scala、Python)
- 字典示例:提供示例,说明如何在上下文中使用字典中的术语。 (Scala、Python)
- 文档翻译:翻译所有支持的语言和方言的文档,同时保留文档结构和数据格式。 (Scala、Python)
- 分析布局:从给定文档中提取文本和布局信息。 (Scala、Python)
- 分析收据:使用光学字符识别 (OCR) 和我们的收据模型从收据中检测和提取数据,这让你可以轻松地从收据中提取结构化数据,例如商店名称、商家电话号码、交易日期、交易总额等。 (Scala、Python)
- 分析名片:使用光学字符识别 (OCR) 和名片模型从名片中检测和提取数据,这让你可以轻松地从名片中提取结构化数据,例如联系人姓名、公司名称、电话号码、电子邮件等。 (Scala、Python)
- 分析发票:使用光学字符识别 (OCR) 以及我们的发票理解深度学习模型从发票中检测和提取数据,这让你可以轻松地从发票中提取结构化数据,例如客户、供应商、发票 ID、发票截止日期、总金额、发票应付金额、税额、发货人、帐单付款、行项等。 (Scala、Python)
- 分析 ID 文档:使用光学字符识别 (OCR) 和我们的 ID 文档模型从标识文档中检测和提取数据,从而轻松从 ID 文档中提取结构化数据,例如名字、姓氏、出生日期、文档编号等。 (Scala、Python)
- 分析自定义表单:根据从一组有代表性的训练表单创建的模型,将表单(PDF 和图像)中的信息提取到结构化数据中。 (Scala、Python)
- 获取自定义模型:获取有关自定义模型的详细信息。 (Scala、Python)
- 列出自定义模型:获取有关所有自定义模型的信息。 (Scala、Python)
首先,导入所需的库并初始化 Spark 会话。
from pyspark.sql.functions import udf, col
from synapse.ml.io.http import HTTPTransformer, http_udf
from requests import Request
from pyspark.sql.functions import lit
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col
导入 Azure AI 服务库,并将以下代码片段中的密钥和位置替换为 Azure AI 服务密钥和位置。
from synapse.ml.services import *
from synapse.ml.core.platform import *
# A general AI services key for AI Language, Computer Vision and Document Intelligence (or use separate keys that belong to each service)
service_key = find_secret(
secret_name="ai-services-api-key", keyvault="mmlspark-build-keys"
) # Replace the call to find_secret with your key as a python string. e.g. service_key="27snaiw..."
service_loc = "chinanorth3"
# An Anomaly Detector subscription key
anomaly_key = find_secret(
secret_name="anomaly-api-key", keyvault="mmlspark-build-keys"
) # Replace the call to find_secret with your key as a python string. If you don't have an anomaly detection resource created before Sep 20th 2023, you won't be able to create one.
anomaly_loc = "chinanorth2"
# A Translator subscription key
translator_key = find_secret(
secret_name="translator-key", keyvault="mmlspark-build-keys"
) # Replace the call to find_secret with your key as a python string.
translator_loc = "chinaeast2"
AI 语言服务提供了几种从文本中提取智能见解的算法。 例如,我们可以找到给定输入文本的情绪。 服务将返回介于 0.0 和 1.0 之间的分数,其中低分数表示负面情绪,高分表示正面情绪。 此示例使用三个简单的句子,并返回每个句子的情绪。
# Create a dataframe that's tied to it's column names
df = spark.createDataFrame(
[
("I am so happy today, its sunny!", "en-US"),
("I am frustrated by this rush hour traffic", "en-US"),
("The AI services on spark aint bad", "en-US"),
],
["text", "language"],
)
# Run the Text Analytics service with options
sentiment = (
AnalyzeText()
.setKind("SentimentAnalysis")
.setTextCol("text")
.setLocation(service_loc)
.setSubscriptionKey(service_key)
.setEndpoint("")
.setOutputCol("sentiment")
.setErrorCol("error")
.setLanguageCol("language")
)
# Show the results of your text query in a table format
display(
sentiment.transform(df).select(
"text", col("sentiment.documents.sentiment").alias("sentiment")
)
)
健康服务文本分析从非结构化文本(如医生处方、出院小结、临床文档和电子健康记录)中提取和标记相关的医疗信息。
以下代码示例分析医生笔记中的文本并将其转换为结构化数据。
df = spark.createDataFrame(
[
("20mg of ibuprofen twice a day",),
("1tsp of Tylenol every 4 hours",),
("6-drops of Vitamin B-12 every evening",),
],
["text"],
)
healthcare = (
AnalyzeHealthText()
.setSubscriptionKey(service_key)
.setLocation(service_loc)
.setEndpoint("")
.setLanguage("en")
.setOutputCol("response")
)
display(healthcare.transform(df))
翻译器是一种基于云的机器翻译服务,是用于构建智能应用的 Azure AI 服务系列 AI API 的一部分。 “翻译”可以轻松地集成到应用程序、网站、工具和解决方案中。 它可用来在 90 种语言和方言中添加多语言用户体验,并可用来翻译文本,无需托管你自己的算法。
下面的代码示例中,通过提供要翻译的句子和要翻译到的目标语言来执行简单文本翻译。
from pyspark.sql.functions import col, flatten
# Create a dataframe including sentences you want to translate
df = spark.createDataFrame(
[(["Hello, what is your name?", "Bye"],)],
[
"text",
],
)
# Run the Translator service with options
translate = (
Translate()
.setSubscriptionKey(translator_key)
.setLocation(translator_loc)
.setEndpoint("")
.setTextCol("text")
.setToLanguage(["zh-Hans"])
.setOutputCol("translation")
)
# Show the results of the translation.
display(
translate.transform(df)
.withColumn("translation", flatten(col("translation.translations")))
.withColumn("translation", col("translation.text"))
.select("translation")
)
Azure AI 文档智能是 Azure 应用 AI 服务的一部分,可让用户使用机器学习技术构建自动化数据处理软件。 借助 Azure AI 文档智能,可以标识并提取文档中的文本、键/值对、选择标记、表和结构。 该服务会输出结构化数据,其中包含原始文件中的关系、边界框、置信度,等等。
下面的代码示例分析名片图像,并将上面的信息提取到结构化数据中。
from pyspark.sql.functions import col, explode
# Create a dataframe containing the source files
imageDf = spark.createDataFrame(
[
(
"https://mmlspark.blob.core.windows.net/datasets/FormRecognizer/business_card.jpg",
)
],
[
"source",
],
)
# Run the Form Recognizer service
analyzeBusinessCards = (
AnalyzeBusinessCards()
.setSubscriptionKey(service_key)
.setLocation(service_loc)
.setEndpoint("")
.setImageUrlCol("source")
.setOutputCol("businessCards")
)
# Show the results of recognition.
display(
analyzeBusinessCards.transform(imageDf)
.withColumn(
"documents", explode(col("businessCards.analyzeResult.documentResults.fields"))
)
.select("source", "documents")
)
Azure AI 视觉会分析图像以识别结构,例如人脸、对象和自然语言说明。
以下代码示例分析图像并使用标签标记图像。 标记是对图像中可识别的对象、人物、风景和动作等事物的单个词说明。
# Create a dataframe with the image URLs
base_url = "https://raw.githubusercontent.com/Azure-Samples/cognitive-services-sample-data-files/master/ComputerVision/Images/"
df = spark.createDataFrame(
[
(base_url + "objects.jpg",),
(base_url + "dog.jpg",),
(base_url + "house.jpg",),
],
[
"image",
],
)
# Run the Computer Vision service. Analyze Image extracts information from/about the images.
analysis = (
AnalyzeImage()
.setLocation(service_loc)
.setSubscriptionKey(service_key)
.setEndpoint("")
.setVisualFeatures(
["Categories", "Color", "Description", "Faces", "Objects", "Tags"]
)
.setOutputCol("analysis_results")
.setImageUrlCol("image")
.setErrorCol("error")
)
# Show the results of what you wanted to pull out of the images.
display(analysis.transform(df).select("image", "analysis_results.description.tags"))
语音转文本服务将语音音频的流或文件转换为文本。 以下代码示例将一个音频文件转录为文本。
# Create a dataframe with our audio URLs, tied to the column called "url"
df = spark.createDataFrame(
[("https://mmlspark.blob.core.windows.net/datasets/Speech/audio2.wav",)], ["url"]
)
# Run the Speech-to-text service to translate the audio into text
speech_to_text = (
SpeechToTextSDK()
.setSubscriptionKey(service_key)
.setLocation(service_loc)
.setEndpointId("")
.setOutputCol("text")
.setAudioDataCol("url")
.setLanguage("en-US")
.setProfanity("Masked")
)
# Show the results of the translation
display(speech_to_text.transform(df).select("url", "text.DisplayText"))
文本转语音服务允许用户从 119 种语言和变体的 270 多种神经语音中进行选择,以构建讲述自然语言的应用和服务。
下面的代码示例将文本转换为包含文本内容的音频文件。
from synapse.ml.services.speech import TextToSpeech
fs = ""
if running_on_databricks():
fs = "dbfs:"
elif running_on_synapse_internal():
fs = "Files"
# Create a dataframe with text and an output file location
df = spark.createDataFrame(
[
(
"Reading out loud is fun! Check out aka.ms/spark for more information",
fs + "/output.mp3",
)
],
["text", "output_file"],
)
tts = (
TextToSpeech()
.setSubscriptionKey(service_key)
.setTextCol("text")
.setLocation(service_loc)
.setUrl("https://<service_loc>.tts.speech.azure.cn/cognitiveservices/v1")
.setVoiceName("en-US-JennyNeural")
.setOutputFileCol("output_file")
)
# Check to make sure there were no errors during audio creation
display(tts.transform(df))
如果你在 2023 年 9 月 20 日之前没有创建异常检测资源,你现在将无法创建。 你可能想要跳过此部分。
异常检测器对于检测时间序列数据中的不规则性非常有用。 下面的代码示例使用异常检测器服务来查找某个时序中存在的异常。
# Create a dataframe with the point data that Anomaly Detector requires
df = spark.createDataFrame(
[
("1972-01-01T00:00:00Z", 826.0),
("1972-02-01T00:00:00Z", 799.0),
("1972-03-01T00:00:00Z", 890.0),
("1972-04-01T00:00:00Z", 900.0),
("1972-05-01T00:00:00Z", 766.0),
("1972-06-01T00:00:00Z", 805.0),
("1972-07-01T00:00:00Z", 821.0),
("1972-08-01T00:00:00Z", 20000.0),
("1972-09-01T00:00:00Z", 883.0),
("1972-10-01T00:00:00Z", 898.0),
("1972-11-01T00:00:00Z", 957.0),
("1972-12-01T00:00:00Z", 924.0),
("1973-01-01T00:00:00Z", 881.0),
("1973-02-01T00:00:00Z", 837.0),
("1973-03-01T00:00:00Z", 9000.0),
],
["timestamp", "value"],
).withColumn("group", lit("series1"))
# Run the Anomaly Detector service to look for irregular data
anomaly_detector = (
SimpleDetectAnomalies()
.setSubscriptionKey(anomaly_key)
.setLocation(anomaly_loc)
.setEndpoint("")
.setTimestampCol("timestamp")
.setValueCol("value")
.setOutputCol("anomalies")
.setGroupbyCol("group")
.setGranularity("monthly")
)
# Show the full results of the analysis with the anomalies marked as "True"
display(
anomaly_detector.transform(df).select("timestamp", "value", "anomalies.isAnomaly")
)
借助 Spark 上的 HTTP,可以在大数据管道中使用任何 Web 服务。 在此示例中,我们使用 World Bank API 获取有关全球各个国家/地区的信息。
# Use any requests from the python requests library
def world_bank_request(country):
return Request(
"GET", "http://api.worldbank.org/v2/country/{}?format=json".format(country)
)
# Create a dataframe with specifies which countries/regions we want data on
df = spark.createDataFrame([("br",), ("usa",)], ["country"]).withColumn(
"request", http_udf(world_bank_request)(col("country"))
)
# Much faster for big data because of the concurrency :)
client = (
HTTPTransformer().setConcurrency(3).setInputCol("request").setOutputCol("response")
)
# Get the body of the response
def get_response_body(resp):
return resp.entity.content.decode()
# Show the details of the country/region data returned
display(
client.transform(df).select(
"country", udf(get_response_body)(col("response")).alias("response")
)
)