将 JSON 架构与 Apache Kafka 应用程序配合使用

本教程将指导你在事件中心中使用 Azure 架构注册表,通过 JSON 架构对事件进行序列化和反序列化的步骤。

在此用例中,Kafka 生成者应用程序使用存储在 Azure 架构注册表中的 JSON 架构,序列化事件并将其发布到 Azure 事件中心中的 Kafka 主题/事件中心。 Kafka 消费者对从事件中心消费的事件进行反序列化。 为此,它使用存储在 Azure 架构注册表中的事件的架构 ID 和 JSON 架构。 显示使用 JSON 架构的 Kafka 应用程序的架构序列化/取消序列化的关系图。

先决条件

如果不熟悉 Azure 事件中心,请参阅 事件中心概述 ,然后再执行本快速入门。

若要完成本快速入门,需要满足以下先决条件:

创建事件中心

按照快速入门中的说明作: 创建事件中心命名空间和事件中心 以创建事件中心命名空间和事件中心。 然后,按照 “获取连接字符串 ”中的说明获取事件中心命名空间的连接字符串。

请记下您在当前快速入门中使用的以下设置:

  • 事件中心命名空间的连接字符串
  • 事件中心的名称

创建架构

按照 使用架构注册表创建架构 中的说明创建架构组和架构。

  1. 使用架构注册表门户创建名为 contoso-sg 的架构组。 使用 JSON 架构 作为序列化类型。

  2. 在该架构组中,使用架构名称创建新的 JSON 架构: Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice 使用以下架构内容。

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    } 
    

注册应用程序以访问架构注册表

可以使用 Microsoft Entra ID 来授权 Kafka 生成者和使用者应用程序访问 Azure 架构注册表资源。 若要启用它,需要从 Azure 门户向 Microsoft Entra 租户注册客户端应用程序。

若要注册名为 example-app Microsoft Entra 应用程序,请参阅向 Microsoft Entra 租户注册应用程序

  • tenant.id - 设置应用程序的租户 ID
  • client.id - 设置应用程序的客户端 ID
  • client.secret - 设置用于身份验证的客户端密码

如果使用托管身份标识,则需要:

  • use.managed.identity.credential - 指示应使用 MSI 凭据,应在启用 MSI 的 VM 上使用
  • managed.identity.clientId - 如果指定,则生成具有给定客户端 ID managed.identity.resourceId 的 MSI 凭据 - 如果指定,则生成具有给定资源 ID 的 MSI 凭据

将用户添加到架构注册表读取者角色

将用户帐户添加到命名空间级别的 架构注册表读取者 角色。 还可以使用 Schema Registry Contributor 角色,但在此快速入门中不必这样做。

  1. “事件中心命名空间 ”页上,选择左侧菜单上的 访问控制(IAM )。
  2. “访问控制”(IAM) 页上,选择菜单上的“ + 添加 ->添加角色分配 ”。
  3. “分配类型 ”页上,选择“ 下一步”。
  4. 在“ 角色 ”页上,选择 “架构注册表读取者”,然后选择页面底部的“ 下一步 ”。
  5. 使用 “+ 选择成员 ”链接将你在上一步中创建的应用程序添加到 example-app 角色,然后选择“ 下一步”。
  6. 在“查看 + 分配”页面上,选择“查看 + 分配”

更新 Kafka 应用程序的客户端应用程序配置

需要使用 Microsoft Entra 应用程序详细信息以及架构注册表信息更新 Kafka 生成者和使用者应用程序的客户端配置。

若要更新 Kafka 生成者配置,请导航到 azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer

  1. 按照适用于事件中心的 Kafka 快速入门指南更新 src/main/resources/app.properties 中 Kafka 应用程序的配置。

  2. 使用架构注册表相关的配置和在上一步中创建的 Microsoft Entra 应用程序,更新 src/main/resources/app.properties 中生产者的配置详细信息,如下所示:

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.chinacloudapi.cn
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. 按照相同的说明并更新 azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer 配置。

  4. 对于 Kafka 生成者和使用者应用程序,使用以下 JSON 架构:

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    }
    

将 Kafka 生成者与 JSON 架构验证配合使用

若要运行 Kafka 生成者应用程序,请导航到 azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer

  1. 可以运行生成者应用程序,以便它可以生成 JSON 架构特定的记录或通用记录。 对于特定记录模式,您首先需要使用以下 Maven 命令针对生产者架构生成类。

    mvn generate-sources
    
  2. 然后,可以使用以下命令运行生成者应用程序。

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. 成功执行生成者应用程序后,它会提示你选择生成者方案。 对于本快速入门,可以选择选项 1 - 生成 SpecificRecords

    Enter case number:
    1 - produce SpecificRecords
    
  4. 成功进行数据序列化和发布后,应在生成者应用程序中看到以下控制台日志:

    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
    

将 Kafka 消费者与 JSON 模式验证结合使用

若要运行 Kafka 使用者应用程序,请导航到 azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer

  1. 可以运行使用者应用程序,以便它可以使用 JSON 架构特定的记录或通用记录。 对于特定记录模式,您首先需要使用以下 Maven 命令针对生产者架构生成类。

    mvn generate-sources
    
  2. 然后,可以使用以下命令运行使用者应用程序。

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. 成功执行使用者应用程序后,它会提示你选择生成者方案。 对于本快速入门,可以选择选项 1 - 使用 SpecificRecords

    Enter case number:
    1 - consume SpecificRecords
    
  4. 成功使用和反序列化数据后,应在生成者应用程序中看到以下控制台日志:

    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
    
    

清理资源

删除事件中心命名空间或删除包含命名空间的资源组。