本文介绍如何使用 Synapse 机器学习 (SynapseML) 创建机器学习应用程序。 SynapseML 通过添加许多深度学习和数据科学工具(例如 Azure AI 服务、OpenCV、LightGBM 等)来扩展 Apache Spark 的分布式机器学习解决方案。 使用 SynapseML 可以基于各种 Spark 数据源生成功能强大且高度可缩放的预测模型和分析模型。 Synapse Spark 提供内置的 SynapseML 库,包括:
- Vowpal Wabbit – 用于机器学习的库服务,可以实现文本分析,例如在推文中进行情绪分析。
- MMLSpark: Unifying Machine Learning Ecosystems at Massive Scales(MMLSpark:大规模统一机器学习生态系统)- 在 SparkML 管道中将 Azure AI 服务的功能组合在一起,从而派生出认知数据建模服务(例如异常检测)的解决方案设计。
- LightGBM – LightGBM 是一种使用基于树的学习算法的梯度提升框架。 根据设计,它是分布式的,可以提升效率。
- 条件 KNN - 支持条件查询的可缩放 KNN 模型。
- Spark 上的 HTTP – 在集成 Spark 时用于协调分布式微服务,并提供基于 HTTP 协议的易访问性。
本教程将演示几个有关在 SynapseML 中使用 Azure AI 服务的示例,包括
- 文本分析 - 获取一组句子的情绪。
- 计算机视觉 - 获取与一组图像相关联的标记(单字说明)。
- 必应图像搜索 - 在 Web 中搜索与自然语言查询相关的图像。
- 异常检测器 - 检测时间序列数据中的异常。
如果没有 Azure 订阅,可在开始前创建一个试用帐户。
- Azure Synapse Analytics 工作区,其中 Azure Data Lake Storage Gen2 存储帐户配置为默认存储。 你需要成为所使用的 Data Lake Storage Gen2 文件系统的存储 Blob 数据参与者。
- Azure Synapse Analytics 工作区中的 Spark 池。 有关详细信息,请参阅在 Azure Synapse 中创建 Spark 池。
- 在 Azure Synapse 中配置 Azure AI 服务教程中所述的预配置步骤。
若要开始,请导入 SynapseML 并配置服务密钥。
import synapse.ml
from synapse.ml.cognitive import *
from notebookutils import mssparkutils
# An Azure AI services multi-service resource key for Text Analytics and Computer Vision (or use separate keys that belong to each service)
ai_service_key = mssparkutils.credentials.getSecretWithLS("ADD_YOUR_KEY_VAULT_LINKED_SERVICE_NAME", "ADD_YOUR_SECRET_NAME")
# An Anomaly Detector subscription key
anomalydetector_key = mssparkutils.credentials.getSecretWithLS("ADD_YOUR_KEY_VAULT_LINKED_SERVICE_NAME", "ADD_YOUR_SECRET_NAME")
文本分析服务提供了几种从文本中提取智能见解的算法。 例如,我们可以找到给定输入文本的情绪。 服务将返回介于 0.0 和 1.0 之间的分数,其中低分数表示负面情绪,高分表示正面情绪。 此示例使用三个简单的句子,并返回每个句子的情绪。
from pyspark.sql.functions import col
# Create a dataframe that's tied to it's column names
df_sentences = spark.createDataFrame([
("I am so happy today, its sunny!", "en-US"),
("this is a dog", "en-US"),
("I am frustrated by this rush hour traffic!", "en-US")
], ["text", "language"])
# Run the Text Analytics service with options
sentiment = (TextSentiment()
.setTextCol("text")
.setUrl("https://<service_region>.api.cognitive.azure.cn/text/analytics/v3.0/sentiment")
.setSubscriptionKey(ai_service_key)
.setOutputCol("sentiment")
.setErrorCol("error")
.setLanguageCol("language"))
# Show the results of your text query in a table format
display(sentiment.transform(df_sentences).select("text", col("sentiment")[0].getItem("sentiment").alias("sentiment")))
text | 情绪 |
---|---|
I am frustrated by this rush hour traffic! | 消极 |
this is a dog | 中立 |
今天天气晴朗,我真高兴! | 积极 |
计算机视觉会分析图像以识别结构,例如人脸、对象和自然语言说明。 在此示例中,我们将标记以下图像。 标记是对图像中可识别的对象、人物、风景和动作等事物的单个词说明。
# Create a dataframe with the image URL
df_images = spark.createDataFrame([
("https://raw.githubusercontent.com/Azure-Samples/cognitive-services-sample-data-files/master/ComputerVision/Images/objects.jpg", )
], ["image", ])
# Run the Computer Vision service. Analyze Image extracts information from/about the images.
analysis = (AnalyzeImage()
.setUrl("https://<service_region>.api.cognitive.azure.cn/vision/v3.1/analyze")
.setSubscriptionKey(ai_service_key)
.setVisualFeatures(["Categories","Color","Description","Faces","Objects","Tags"])
.setOutputCol("analysis_results")
.setImageUrlCol("image")
.setErrorCol("error"))
# Show the results of what you wanted to pull out of the images.
display(analysis.transform(df_images).select("image", "analysis_results.description.tags"))
image | 标记 |
---|---|
https://raw.githubusercontent.com/Azure-Samples/cognitive-services-sample-data-files/master/ComputerVision/Images/objects.jpg |
[skating, person, man, outdoor, riding, sport, skateboard, young, board, shirt, air, park, boy, side, jumping, ramp, trick, doing, flying] |
异常检测器对于检测时间序列数据中的不规则性非常有用。 在此示例中,我们使用该服务来查找整个时间序列中的异常情况。
from pyspark.sql.functions import lit
# Create a dataframe with the point data that Anomaly Detector requires
df_timeseriesdata = spark.createDataFrame([
("1972-01-01T00:00:00Z", 826.0),
("1972-02-01T00:00:00Z", 799.0),
("1972-03-01T00:00:00Z", 890.0),
("1972-04-01T00:00:00Z", 900.0),
("1972-05-01T00:00:00Z", 766.0),
("1972-06-01T00:00:00Z", 805.0),
("1972-07-01T00:00:00Z", 821.0),
("1972-08-01T00:00:00Z", 20000.0), # anomaly
("1972-09-01T00:00:00Z", 883.0),
("1972-10-01T00:00:00Z", 898.0),
("1972-11-01T00:00:00Z", 957.0),
("1972-12-01T00:00:00Z", 924.0),
("1973-01-01T00:00:00Z", 881.0),
("1973-02-01T00:00:00Z", 837.0),
("1973-03-01T00:00:00Z", 9000.0) # anomaly
], ["timestamp", "value"]).withColumn("group", lit("series1"))
# Run the Anomaly Detector service to look for irregular data
anomaly_detector = (SimpleDetectAnomalies()
.setSubscriptionKey(anomalydetector_key)
.setUrl("https://<service_region>.api.cognitive.azure.cn/anomalydetector/v1.0/timeseries/entire/detect")
.setTimestampCol("timestamp")
.setValueCol("value")
.setOutputCol("anomalies")
.setGroupbyCol("group")
.setGranularity("monthly"))
# Show the full results of the analysis with the anomalies marked as "True"
display(anomaly_detector.transform(df_timeseriesdata).select("timestamp", "value", "anomalies.isAnomaly"))
timestamp | 值 | isAnomaly |
---|---|---|
1972-01-01T00:00:00Z | 826.0 | false |
1972-02-01T00:00:00Z | 799.0 | false |
1972-03-01T00:00:00Z | 890.0 | false |
1972-04-01T00:00:00Z | 900.0 | false |
1972-05-01T00:00:00Z | 766.0 | false |
1972-06-01T00:00:00Z | 805.0 | false |
1972-07-01T00:00:00Z | 821.0 | false |
1972-08-01T00:00:00Z | 20000.0 | 是 |
1972-09-01T00:00:00Z | 883.0 | false |
1972-10-01T00:00:00Z | 898.0 | false |
1972-11-01T00:00:00Z | 957.0 | false |
1972-12-01T00:00:00Z | 924.0 | false |
1973-01-01T00:00:00Z | 881.0 | false |
1973-02-01T00:00:00Z | 837.0 | false |
1973-03-01T00:00:00Z | 9000.0 | 是 |
语音转文本服务将语音音频的流或文件转换为文本。 在本示例中,我们将一个音频文件转录为文本。
# Create a dataframe with our audio URLs, tied to the column called "url"
df = spark.createDataFrame([("https://mmlspark.blob.core.windows.net/datasets/Speech/audio2.wav",)
], ["url"])
# Run the Speech-to-text service to translate the audio into text
speech_to_text = (SpeechToTextSDK()
.setSubscriptionKey(service_key)
.setUrl("https://<service_region>.stt.speech.azure.cn/speech/recognition/conversation/cognitiveservices/v1")
.setOutputCol("text")
.setAudioDataCol("url")
.setLanguage("en-US")
.setProfanity("Masked"))
# Show the results of the translation
display(speech_to_text.transform(df).select("url", "text.DisplayText"))
url | DisplayText |
---|---|
https://mmlspark.blob.core.windows.net/datasets/Speech/audio2.wav |
自定义语音提供了一些工具,通过比较音频数据和自定义语音门户的相应识别结果,你可以直观地检查模型的识别质量。 你可以播放上传的音频,并确定提供的识别结果是否正确。 使用此工具,可以快速检查 Microsoft 的基准语音转文本模型或经过训练的自定义模型的质量,而无需转录任何音频数据。 |
为了确保关闭 Spark 实例,请结束任何已连接的会话(笔记本)。 达到 Apache Spark 池中指定的空闲时间时,池将会关闭。 也可以从笔记本右上角的状态栏中选择“停止会话”。