使用 Apache Spark REST API 将远程作业提交到已启用 Entra 的 HDInsight Spark 群集

了解如何使用 Apache Livy(Apache Spark REST API)将远程作业提交到 Azure HDInsight Spark 群集。 有关详细文档,请参阅 Apache Livy

可以使用 Livy 运行交互式 Spark shell 或提交要在 Spark 上运行的批处理作业。 本文介绍如何使用 Livy 提交批处理作业。 本文中的代码片段使用 cURL 对 Livy Spark 终结点进行 REST API 调用。

先决条件

在 HDInsight 上启用了 Entra 的 Apache Spark 群集。 有关说明,请参阅 在 Azure HDInsight 中创建 Apache Spark 群集

设置(安全持有者访问令牌)

需要持有者令牌才能发送 cURL 或任何 REST 通信。 可以按照下面提到的步骤获取令牌:

使用以下规范对 OAuth 2.0 令牌终结点执行 HTTP GET 请求:

URL

https://login.chinacloudapi.cn/{Tenant_ID}/oauth2/v2.0/token

身体

参数 Description 必需
授权类型 (grant_type) 必须设置为“client_credentials”
客户编号 Entra 应用注册的应用程序(客户端)ID
client_secret 生成的客户端机密或证书
作用域 带有 .default 后缀的资源 URL

cURL 请求

curl --request GET \
  --url https://login.chinacloudapi.cn/{tenant_id}/oauth2/v2.0/token \
  --header 'Content-Type: multipart/form-data' \
  --form grant_type=client_credentials \
  --form client_id={app_id} \
  --form client_secret={client_secret} \
  --form scope=https://{clustername}.clusteraccess.azurehdinsight.cn/.default \

响应

成功的请求返回一个 JSON 对象,其中包含:

  • token_type:始终为“Bearer”
  • expires_in:令牌有效期(以秒为单位)
  • ext_expires_in:延长过期时间(以秒为单位)
  • access_token:用于身份验证的持有者令牌
{
	"token_type": "Bearer",
	"expires_in": 3599,
	"ext_expires_in": 3599,
	"access_token": "eyJ0eXAiOiJKV1iLCJub25jZSI6IkhaZ3lqQ2MxSkxzaXRSbmxzT1FTSHV0bEtBeXhhMU1JTzdyWmluLWF6LUEiLCJhbGciOiJSUzI1NiIsIng1dCI6ImltaTBZMnowZFlLeEJ0dEFxS19UdDVoWUJUayIsImtpZCI6ImltaTBZMnowZFlLeEJ0dEFxS19UdDVoWUJUayJ9.eyJhdWQiOiJodHRwczovL2dyYXBoLm1pY3Jvc29mdC5jb20iLCJpc3MiOiJodHRwczovL3N0cy53aW5kb3dzLm5ldC8wY2QzZGY5OS1lMDJmLTRmZDgtYTdkOC0zYjE5ZWVhZGFiYTUvIiwiaWF0IjoxNzQxMjgzMzUzLCJuYmYiOjE3NDEyODMzNTMsImV4cCI6MTc0MTI4NzI1MywiYWlvIjoiazJSZ1lIRDF1U1R4NGx2bjdmMTdGcXlkZUdwWlBnQT0iLCJhcHBfZGlzcGxheW5hbWUiOiJBenVyZSBIREkgTVNGVCBDbGllbnQiLCJhcHBpZCI6IjAzZDNiNTg5LWFjM2MtNDE4NC1iY2EyLTQ3ZWRiN2Q2ZmVjNiIsImFwcGlkYWNyIjoiMSIsImlkcCI6Imh0dHBzOi8vc3RzLndpbmRvd3MubmV0LzBjZDNkZjk5LWUwMmYtNGZkOC1hN2Q4LTNiMTllZWFkYWJhNS8iLCJpZHR5cCI6ImFwcCIsIm9pZCI6ImQ0NDA3YjQ4LWZmZTctNDJjNS04ZDIwLTdiMTTgwNWE4NCIsInJoIjoiMS5BUnNBbWRfVERDX2cyRS1uMkRzWjdxMnJwUU1BQUFBQUFBQUF3QUFBQUFBQUFBRFlBQUFiQUEuIiwic3ViIjoiZDQ0MDdiNDgtZmZlNy00MmM1LThkMjAtN2IxMzU5ODA1YTg0IiwidGVuYW50X3JlZ2lvbl9zY29wZSI6Ik5BIiwidGlkIjoiMGNkM2RmOTktZTAyZi00ZmQ4LWE3ZDgtM2IxOWVlYWRhYmE1IiwidXRpIjoiLVA1T3JPWGpJVWk0VE12dElTYWRBQSIsInZlciI6IjEuMCIsIndpZHMiOlsiMDk5N2ExZDAtMGQxZC00YWNiLWI0MDgtZDVjYTczMTIxZTkwIl0sInhtc19pZHJlbCI6IjI4IDciLCJ4bXNfdGNkdCI6MTQ4NjM3NDQ2MH0.a9z3ZYyMTRQCoY7dzPYE55DmpNAxqo4a4rrt80A-RpK0NDDAftNkc2hafbLl6gdwEzqRyKc1HExUggFUpKxaLUXc62-u-9emxC12EsNlQYd-ZzG_GRDNoTYrro4RDRL-_gDo2lgBNOi5ZZ4a9UI_pYVvV1b0SBRpgd5bmIV4kI2tDfAVZ1-HMpGscuVkQIy45Tqt4c3gXPoMEZ3UYikbCpErbTNfUFqngE3sARXRV-rB1OMu6ZbN32ijjL-rD8593-IfSpmVDUfE5CMGc-7FuWGOYyUUJmp5AQ1yFpJzqaDBEdPT8kKync1o7eplWXCsPWOnVvAKNf7BuWCRRedBWg"
}

提交 Apache Livy Spark 批处理作业

在提交批量处理作业之前,必须在与群集关联的群集存储上上传应用程序 jar 文件。 可以使用命令行实用工具 AzCopy 执行此作。 可以使用各种其他客户端上传数据。 可以在 HDInsight 中的 Apache Hadoop 作业的上传数据中找到有关它们的详细信息。

curl -k -v -H "Content-Type: application/json" -H "Authorization: Bearer " -X POST -d '{ "file":"<path to application jar>", "className":"<classname in jar>" }' 'https://<spark_cluster_name>.azurehdinsight.cn/livy/batches' -H "X-Requested-By: admin"

示例

  • 如果 jar 文件位于群集存储(WASBS):

    curl -k -v -H "Content-Type: application/json" -H "Authorization: Bearer " -X POST -d '{ "file":"wasbs://mycontainer@mystorageaccount.blob.core.chinacloudapi.cn/data/SparkSimpleTest.jar", "className":"com.microsoft.spark.test.SimpleFile" }' "https://mysparkcluster.azurehdinsight.cn/livy/batches" -H "X-Requested-By: admin"
    
    
  • 如果要将 jar 文件名和类名作为输入文件的一部分传递(在此示例中,input.txt)

    curl -k -v -H "Content-Type: application/json" -H "Authorization: Bearer " -X POST --data @C:\Temp\input.txt "https://mysparkcluster.azurehdinsight.cn/livy/batches" -H "X-Requested-By: admin"
    
    

获取有关群集上运行的 Livy Spark 批处理的信息

Syntax:

curl -k -v -H "Authorization: Bearer " -X GET "https://<spark_cluster_name>.azurehdinsight.cn/livy/batches"

示例

  • 如果要检索群集上运行的所有 Livy Spark 批处理:

    curl -k -v -H "Authorization: Bearer " -X GET "https://mysparkcluster.azurehdinsight.cn/livy/batches"
    
    
  • 如果要检索具有给定批次 ID 的特定批次

    curl -k -v -H "Authorization: Bearer " -X GET "https://mysparkcluster.azurehdinsight.cn/livy/batches/{batchId}"
    
    

删除 Livy Spark 批处理任务

	curl -k -v -H "Authorization: Bearer " -X DELETE "https://<spark_cluster_name>.azurehdinsight.cn/livy/batches/{batchId}"

Example

删除具有批 ID 5的批处理作业。

curl -k -v -H "Authorization: Bearer " -X DELETE "https://mysparkcluster.azurehdinsight.cn/livy/batches/5"

Livy Spark 和高可用性

Livy 为群集上运行的 Spark 作业提供高可用性。 下面是几个示例。

  • 如果将作业远程提交到 Spark 群集后 Livy 服务出现故障,该作业将继续在后台运行。 还原 Livy 后,它将还原作业的状态并报告回来。
  • 适用于 HDInsight 的 Jupyter Notebook 由后端中的 Livy 提供支持。 如果笔记本正在运行 Spark 作业并重新启动 Livy 服务,笔记本将继续运行代码单元。

显示一个示例

在本部分中,我们将介绍使用 Livy Spark 提交批处理作业、监视作业进度,然后删除该作业的示例。 此示例中使用的应用程序是创建 独立 Scala 应用程序并在 HDInsight Spark 群集上运行的文章中开发的应用程序。 此处的步骤假定:

  • 已将应用程序 jar 复制到与群集关联的存储帐户。
  • 你已在这台计算机上安装了 cURL,这是你正在尝试这些步骤的地方。

执行以下步骤:

  1. 为便于使用,请设置环境变量。 此示例基于 Windows 环境,根据需要修改环境变量。 替换CLUSTERNAMETOKEN为相应的值。

    set clustername=CLUSTERNAME
    set token=TOKEN
    
    
  2. 验证 Livy Spark 是否在群集上运行。 可以通过获取正在运行的批处理任务列表来做到这一点。 如果首次使用 Livy 运行作业,则输出应返回零。

    curl -k -v -H "Authorization:%token%" -X GET "https://%clustername%.azurehdinsight.cn/livy/batches"
    
    

    应获得类似于以下代码片段的输出:

    输出

    < HTTP/1.1 200 OK
    < Content-Type: application/json; charset=UTF-8
    < Server: Microsoft-IIS/8.5
    < X-Powered-By: ARR/2.5
    < X-Powered-By: ASP.NET
    < Date: Fri, 20 Nov 2015 23:47:53 GMT
    < Content-Length: 34
    <
    {"from":0,"total":0,"sessions":[]}* Connection #0 to host mysparkcluster.azurehdinsight.cn left intact
    
    

    请注意输出中的最后一行显示 total:0,这表示没有正在运行的批处理。

  3. 现在让我们提交批处理作业。 以下代码片段使用输入文件(input.txt)将 jar 名称和类名作为参数传递。 如果从 Windows 计算机运行这些步骤,建议使用输入文件。

    curl -k -v -H "Content-Type: application/json" -H "Authorization:%token%" -X POST --data @C:\Temp\input.txt "https://%clustername%.azurehdinsight.cn/livy/batches" -H "X-Requested-By: admin"
    
    

    文件 input.txt 中的参数定义如下:

    { "file":"wasbs:///example/jars/SparkSimpleApp.jar", "className":"com.microsoft.spark.example.WasbIOTest" }
    
    

    应会看到类似于以下代码片段的输出:

    输出

    < HTTP/1.1 201 Created
    < Content-Type: application/json; charset=UTF-8
    < Location: /0
    < Server: Microsoft-IIS/8.5
    < X-Powered-By: ARR/2.5
    < X-Powered-By: ASP.NET
    < Date: Fri, 20 Nov 2015 23:51:30 GMT
    < Content-Length: 36
    <
    {"id":0,"state":"starting","log":[]}* Connection #0 to host mysparkcluster.azurehdinsight.cn left intact
    
    

    请注意输出的最后一行如何显示 state:starting。 它还说, id:0。 此处, 0 是批 ID。

  4. 现在可以使用批 ID 检索此特定批的状态。

    curl -k -v -H "Authorization:%token%" -X GET "https://%clustername%.azurehdinsight.cn/livy/batches/0"
    
    

    应会看到类似于以下代码片段的输出:

    输出

    < HTTP/1.1 200 OK
    < Content-Type: application/json; charset=UTF-8
    < Server: Microsoft-IIS/8.5
    < X-Powered-By: ARR/2.5
    < X-Powered-By: ASP.NET
    < Date: Fri, 20 Nov 2015 23:54:42 GMT
    < Content-Length: 509
    <
    {"id":0,"state":"success","log":["\t diagnostics: N/A","\t ApplicationMaster host: 10.0.0.4","\t ApplicationMaster RPC port: 0","\t queue: default","\t start time: 1448063505350","\t final status: SUCCEEDED","\t tracking URL: http://myspar.lpel.jx.internal.chinacloudapp.cn:8088/proxy/application_1447984474852_0002/","\t user: root","15/11/20 23:52:47 INFO Utils: Shutdown hook called","15/11/20 23:52:47 INFO Utils: Deleting directory /tmp/spark-b72cd2bf-280b-4c57-8ceb-9e3e69ac7d0c"]}* Connection #0 to host mysparkcluster.azurehdinsight.cn left intact
    
    

    输出现在显示 state:success,这表明作业已成功完成。

  5. 如果需要,现在可以删除批处理。

    curl -k -v -H "Authorization:%token%" -X DELETE "https://%clustername%.azurehdinsight.cn/livy/batches/0"
    
    

    应会看到类似于以下代码片段的输出:

    输出

    < HTTP/1.1 200 OK
    < Content-Type: application/json; charset=UTF-8
    < Server: Microsoft-IIS/8.5
    < X-Powered-By: ARR/2.5
    < X-Powered-By: ASP.NET
    < Date: Sat, 21 Nov 2015 18:51:54 GMT
    < Content-Length: 17
    <
    {"msg":"deleted"}* Connection #0 to host mysparkcluster.azurehdinsight.cn left intact
    
    

    输出的最后一行显示已成功删除批处理。 如果删除作业,则在作业运行时也会终止该作业。 如果您删除已完成的作业,无论是否成功,其作业信息将被完全删除。

HDInsight 5.1 版本开始对 Livy 配置进行了更新

默认情况下,HDInsight 3.5 群集及更高版本禁用使用本地文件路径来访问示例数据文件或 jar。 建议改用 wasbs:// 路径从群集访问 jar 或示例数据文件。

在 Azure 虚拟网络中提交群集的 Livy 作业

如果从 Azure 虚拟网络中连接到 HDInsight Spark 群集,可以直接连接到群集上的 Livy。 在这种情况下,Livy 终结点的 URL 为 http://<IP address of the headnode>:8998/batches。 此处, 8998 是 Livy 在群集头节点上运行的端口。 有关访问非公共端口上的服务的详细信息,请参阅 HDInsight 上的 Apache Hadoop 服务使用的端口