使用基于证书的服务主体身份验证,将 Apache Spark 应用程序日志和指标收集到 Azure 存储账户中

Apache Spark 诊断发出器扩展是一个库,它允许 Spark 应用程序将日志、事件日志和指标发送到目标,例如Azure 存储帐户、Azure Log Analytics和Azure 存储。

本教程介绍如何创建所需的Azure资源,并使用证书和服务主体配置 Spark 应用程序,以使用 Apache Spark 诊断发射器扩展向Azure 存储帐户发出日志、事件日志和指标。

先决条件

注意

若要完成本教程的步骤,需要有权访问为其分配有“所有者”角色的资源组。

第 1 步。 注册应用程序

  1. 登录到 Azure 门户 并转到 应用注册

  2. 为 Synapse 工作区创建新的应用注册。

    显示创建新应用注册的屏幕截图。

步骤 2. 在 密钥保管库 中生成证书

  1. 导航至密钥保管库 (密钥保管库)。

  2. 展开 对象,然后选择 “证书”。

  3. 单击“生成/导入”。

    显示为应用生成新证书的屏幕截图。

第 3 步: 信任应用程序中的证书

  1. 转到在步骤 1 >管理>清单中创建的应用。

  2. 将证书详细信息追加到清单文件以建立信任。

         "trustedCertificateSubjects": [ 
              { 
              "authorityId": "00000000-0000-0000-0000-000000000001", 
              "subjectName": "Your-Subject-of-Certificate", 
              "revokedCertificateIdentifiers": [] 
              } 
         ] 
    

    屏幕截图显示了在应用程序中信任证书。

步骤 4. 分配存储 Blob 数据参与者角色

  1. 在Azure 存储帐户中,导航到访问控制(IAM)。

  2. 存储 Blob 数据贡献者 角色分配给应用程序(服务主体)。

    显示分配存储 Blob 数据贡献者角色的屏幕截图。

步骤 5. 在 Synapse 中创建链接服务

  1. 在 Synapse Analytics 工作区中,转到 “管理>链接服务”。

  2. 在 Synapse 中创建新的 链接服务 以连接到 密钥保管库

    显示在 synapse 中创建链接服务的屏幕截图。

步骤 6。 在 密钥保管库 中为链接服务分配读取者角色

  1. 获取链接服务的工作区托管标识 ID。 链接服务的托管标识名称和对象 ID 位于“编辑链接服务”下

    屏幕截图显示了托管标识名称和对象 ID 位于“编辑链接服务”中。

  2. 密钥保管库 中,为链接服务分配密钥保管库证书用户角色。

步骤 7. 配置链接服务

收集以下值并添加到 Apache Spark 配置。

  • <EMITTER_NAME>:发射器的名称。
  • < >CERTIFICATE_NAME:在密钥保管库中生成的证书名称。
  • <LINKED_SERVICE_NAME>:Azure Key Vault 链接服务名称。
  • <STORAGE_URI>:Blob 存储目标路径(例如,https://accountname.blob.core.chinacloudapi.cn/containername)。
  • <SERVICE_PRINCIPAL_TENANT_ID>:服务主体租户 ID,可以在App 注册中的 >你的应用名称> 概述>目录(租户)ID查看。
  • < >SERVICE_PRINCIPAL_CLIENT_ID:服务主体客户端 ID,可以在注册>应用名称>概述>应用程序(客户端)ID 中找到它
     "spark.synapse.diagnostic.emitters": <EMITTER_NAME>,
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.type": "AzureStorage",
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.categories": "DriverLog,ExecutorLog,EventLog,Metrics",
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.uri": "<STORAGE_URI>",
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.auth": "ServicePrincipalCert",
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.certificate.keyVault.certificateName": <CERTIFICATE_NAME>",
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.certificate.keyVault.linkedService": <LINKED_SERVICE_NAME>,
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.tenantId": <SERVICE_PRINCIPAL_TENANT_ID>,
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.clientId": <SERVICE_PRINCIPAL_CLIENT_ID>

步骤 8。 提交 Apache Spark 应用程序并查看日志和指标

可以使用 Apache Log4j 库编写自定义日志。

Scala 的示例:

%%spark
val logger = org.apache.log4j.LogManager.getLogger("com.contoso.LoggerExample")
logger.info("info message")
logger.warn("warn message")
logger.error("error message")
//log exception
try {
      1/0
 } catch {
      case e:Exception =>logger.warn("Exception", e)
}
// run job for task level metrics
val data = sc.parallelize(Seq(1,2,3,4)).toDF().count()

PySpark 的示例:

%%pyspark
logger = sc._jvm.org.apache.log4j.LogManager.getLogger("com.contoso.PythonLoggerExample")
logger.info("info message")
logger.warn("warn message")
logger.error("error message")