流分析中的 Azure 机器学习工作室(经典)集成(预览)Azure Machine Learning Studio (classic) integration in Stream Analytics (Preview)

流分析支持调用 Azure 机器学习工作室(经典)终结点的用户定义函数。Stream Analytics supports user-defined functions that call out to Azure Machine Learning Studio (classic) endpoints. 流分析 REST API 库中详细介绍了对此功能的 REST API 支持。REST API support for this feature is detailed in the Stream Analytics REST API library. 本文提供在流分析中成功实现此功能所需的补充信息。This article provides supplemental information needed for successful implementation of this capability in Stream Analytics. 也可在 此处获取已发布的教程。A tutorial has also been posted and is available here.

概述:Azure 机器学习工作室(经典)术语Overview: Azure Machine Learning Studio (classic) terminology

Microsoft Azure 机器学习工作室(经典)提供一个协作型拖放式工具,可用于根据数据生成、测试和部署预测分析解决方案。Microsoft Azure Machine Learning Studio (classic) provides a collaborative, drag-and-drop tool you can use to build, test, and deploy predictive analytics solutions on your data. 此工具称为“Azure 机器学习工作室(经典)”。This tool is called the Azure Machine Learning Studio (classic). 该工作室用于与机器学习资源交互,能够轻松地生成、测试和反复调整设计。The studio is used to interact with the Machine Learning resources and easily build, test, and iterate on your design. 这些资源及其定义如下:These resources and their definitions are below.

  • 工作区:工作区是保存所有其他机器学习资源以进行管理和控制的容器。Workspace: The workspace is a container that holds all other Machine Learning resources together in a container for management and control.
  • 试验:试验由数据科学家创建,以便利用数据集和定型机器学习模型。Experiment: Experiments are created by data scientists to utilize datasets and train a machine learning model.
  • 终结点:终结点是 Azure 机器学习工作室(经典)对象,用于以特征作为输入、应用指定的机器学习模型并返回已评分输出。Endpoint: Endpoints are the Azure Machine Learning Studio (classic) object used to take features as input, apply a specified machine learning model and return scored output.
  • 评分 Web 服务:评分 Web 服务是终结点的集合,如上所述。Scoring Webservice: A scoring webservice is a collection of endpoints as mentioned above.

每个终结点都具有适用于批处理执行和同步执行的 API。Each endpoint has apis for batch execution and synchronous execution. 流分析使用同步执行。Stream Analytics uses synchronous execution. 在 Microsoft Azure 机器学习工作室(经典)中,该特定的服务命名为请求/响应服务The specific service is named a Request/Response Service in Microsoft Azure Machine Learning Studio (classic).

流分析作业所需的机器学习资源Machine Learning resources needed for Stream Analytics jobs

若要处理流分析作业,必须具有请求/响应终结点、 apikey和 swagger 定义才能成功执行。For the purposes of Stream Analytics job processing, a Request/Response endpoint, an apikey, and a swagger definition are all necessary for successful execution. 流分析拥有其他终结点,可构造 swagger 终结点的 URL、查找接口以及向用户返回默认 UDF 定义。Stream Analytics has an additional endpoint that constructs the url for swagger endpoint, looks up the interface and returns a default UDF definition to the user.

通过 REST API 配置流分析和机器学习 UDFConfigure a Stream Analytics and Machine Learning UDF via REST API

使用 REST API,可配置作业来调用 Azure 机器语言函数。By using REST APIs you may configure your job to call Azure Machine Language functions. 步骤如下:The steps are as follows:

  1. 创建流分析作业Create a Stream Analytics job
  2. 定义输入Define an input
  3. 定义输出Define an output
  4. 创建用户定义函数 (UDF)Create a user-defined function (UDF)
  5. 编写调用 UDF 的流分析转换Write a Stream Analytics transformation that calls the UDF
  6. 启动作业Start the job

使用基本属性创建 UDFCreating a UDF with basic properties

例如,以下示例代码创建名为 newudf 的标量 UDF,该 UDF 绑定到 Azure 机器学习工作室(经典)终结点。As an example, the following sample code creates a scalar UDF named newudf that binds to an Azure Machine Learning Studio (classic) endpoint. 请注意,终结点(服务 URI)可以在所选服务的 API 帮助页上找到,而 apiKey 可以在服务主页上找到。Note that the endpoint (service URI) can be found on the API help page for the chosen service and the apiKey can be found on the Services main page.

    PUT : /subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjobs/<streamingjobName>/functions/<udfName>?api-version=<apiVersion>

请求正文示例:Example request body:

    {
        "name": "newudf",
        "properties": {
            "type": "Scalar",
            "properties": {
                "binding": {
                    "type": "Microsoft.MachineLearning/WebService",
                    "properties": {
                        "endpoint": "https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77fb4b46bf2a30c63c078dca/services/b7be5e40fd194258796fb402c1958eaf/execute ",
                        "apiKey": "replacekeyhere"
                    }
                }
            }
        }
    }

调用默认 UDF 的 RetrieveDefaultDefinition 终结点Call RetrieveDefaultDefinition endpoint for default UDF

创建主干 UDF 后,需要 UDF 的完整定义。Once the skeleton UDF is created the complete definition of the UDF is needed. RetreiveDefaultDefinition 终结点可帮助获取绑定到 Azure 机器学习工作室(经典)终结点的标量函数的默认定义。The RetrieveDefaultDefinition endpoint helps you get the default definition for a scalar function that is bound to an Azure Machine Learning Studio (classic) endpoint. 下面的有效负载要求获取绑定到 Azure 机器学习终结点的标量函数的默认 UDF 定义。The payload below requires you to get the default UDF definition for a scalar function that is bound to an Azure Machine Learning endpoint. 它不指定实际的终结点,因为已在 PUT 请求期间提供终结点。It doesn’t specify the actual endpoint as it has already been provided during PUT request. 流分析会调用请求中提供的终结点(如果已显式提供。Stream Analytics calls the endpoint provided in the request if it is provided explicitly. 否则,会使用原来引用的终结点。Otherwise it uses the one originally referenced. 此处 UDF 采用单个字符串参数(一个句子),并返回类型字符串的单个输出以指示该句子的“情绪”标签。Here the UDF takes a single string parameter (a sentence) and returns a single output of type string which indicates the "sentiment" label for that sentence.

POST : /subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjobs/<streamingjobName>/functions/<udfName>/RetrieveDefaultDefinition?api-version=<apiVersion>

请求正文示例:Example request body:

    {
        "bindingType": "Microsoft.MachineLearning/WebService",
        "bindingRetrievalProperties": {
            "executeEndpoint": null,
            "udfType": "Scalar"
        }
    }

其示例输出类似如下所示。A sample output of this would look something like below.

    {
        "name": "newudf",
        "properties": {
            "type": "Scalar",
            "properties": {
                "inputs": [{
                    "dataType": "nvarchar(max)",
                    "isConfigurationParameter": null
                }],
                "output": {
                    "dataType": "nvarchar(max)"
                },
                "binding": {
                    "type": "Microsoft.MachineLearning/WebService",
                    "properties": {
                        "endpoint": "https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77ga4a4bbf2a30c63c078dca/services/b7be5e40fd194258896fb602c1858eaf/execute",
                        "apiKey": null,
                        "inputs": {
                            "name": "input1",
                            "columnNames": [{
                                "name": "tweet",
                                "dataType": "string",
                                "mapTo": 0
                            }]
                        },
                        "outputs": [{
                            "name": "Sentiment",
                            "dataType": "string"
                        }],
                        "batchSize": 10
                    }
                }
            }
        }
    }

使用响应修补 UDFPatch UDF with the response

现在,必须使用之前的响应修补 UDF,如下所示。Now the UDF must be patched with the previous response, as shown below.

PATCH : /subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjobs/<streamingjobName>/functions/<udfName>?api-version=<apiVersion>

请求正文(来自 RetrieveDefaultDefinition 的输出):Request Body (Output from RetrieveDefaultDefinition):

    {
        "name": "newudf",
        "properties": {
            "type": "Scalar",
            "properties": {
                "inputs": [{
                    "dataType": "nvarchar(max)",
                    "isConfigurationParameter": null
                }],
                "output": {
                    "dataType": "nvarchar(max)"
                },
                "binding": {
                    "type": "Microsoft.MachineLearning/WebService",
                    "properties": {
                        "endpoint": "https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77ga4a4bbf2a30c63c078dca/services/b7be5e40fd194258896fb602c1858eaf/execute",
                        "apiKey": null,
                        "inputs": {
                            "name": "input1",
                            "columnNames": [{
                                "name": "tweet",
                                "dataType": "string",
                                "mapTo": 0
                            }]
                        },
                        "outputs": [{
                            "name": "Sentiment",
                            "dataType": "string"
                        }],
                        "batchSize": 10
                    }
                }
            }
        }
    }

实现流分析转换以调用 UDFImplement Stream Analytics transformation to call the UDF

现在,针对每个输入事件查询 UDF(此处名为 scoreTweet)并将该事件的响应写入到输入。Now query the UDF (here named scoreTweet) for every input event and write a response for that event to an output.

    {
        "name": "transformation",
        "properties": {
            "streamingUnits": null,
            "query": "select *,scoreTweet(Tweet) TweetSentiment into blobOutput from blobInput"
        }
    }

获取帮助Get help

如需进一步的帮助,请参阅有关 Azure 流分析的 Microsoft 问答页For further assistance, try our Microsoft Q&A question page for Azure Stream Analytics

后续步骤Next steps