本快速入门指南介绍如何使用适用于事件中心的 Azure 架构注册表验证 Apache Kafka 应用程序中的事件。
在此用例中,Kafka 生成者应用程序使用存储在 Azure 架构注册表中的 Avro 架构,序列化事件并将其发布到 Azure 事件中心中的 Kafka 主题/事件中心。 Kafka 消费者对从事件中心消费的事件进行反序列化。 为此,它使用事件架构 ID 和存储在 Azure 架构注册表中的 Avro 架构。
先决条件
如果不熟悉 Azure 事件中心,请参阅 事件中心概述 ,然后再执行本快速入门。
若要完成本快速入门,需要满足以下先决条件:
- 如果没有 Azure 订阅,请在开始之前创建 一个试用订阅 。
- 在开发环境中,安装以下组件:
- Java 开发工具包 (JDK) 1.7+。
- 下载 并 安装 Maven 二进制存档。
- Git
- 克隆 Kafka 存储库的 Azure 架构注册表 。
创建事件中心
按照快速入门中的说明作: 创建事件中心命名空间和事件中心 以创建事件中心命名空间和事件中心。 然后,按照 “获取连接字符串 ”中的说明获取事件中心命名空间的连接字符串。
请记下您在当前快速入门中使用的以下设置:
- 事件中心命名空间的连接字符串
- 事件中心的名称
创建架构
按照 使用架构注册表创建架构 中的说明创建架构组和架构。
使用架构注册表门户创建名为 contoso-sg 的架构组。 将 Avro 用作序列化类型,将 None 用于兼容性模式。
在该架构组中,创建一个新的 Avro 架构,架构名称为:
Microsoft.Azure.Data.SchemaRegistry.example.Order,并使用以下架构内容。{ "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
注册应用程序以访问架构注册表
可以使用 Microsoft Entra ID 通过从 Azure 门户将客户端应用程序注册到 Microsoft Entra 租户来授权 Kafka 生成者和使用者应用程序访问 Azure 架构注册表资源。
若要注册名为 example-app Microsoft Entra 应用程序,请参阅向 Microsoft Entra 租户注册应用程序。
- tenant.id - 设置应用程序的租户 ID
- client.id - 设置应用程序的客户端 ID
- client.secret - 设置用于身份验证的客户端密码
如果使用托管身份标识,则需要:
- use.managed.identity.credential - 指示应使用 MSI 凭据,应在启用 MSI 的 VM 上使用
- managed.identity.clientId - 如果指定,它将生成具有给定客户端 ID 的 MSI 凭据
- managed.identity.resourceId - 如果指定,它将生成具有给定资源 ID 的 MSI 凭据
将用户添加到架构注册表读取者角色
将用户帐户添加到命名空间级别的 架构注册表读取者 角色。 还可以使用 Schema Registry Contributor 角色,但在此快速入门中不必这样做。
- 在 “事件中心命名空间 ”页上,选择左侧菜单上的 访问控制(IAM )。
- 在 “访问控制”(IAM) 页上,选择菜单上的“ + 添加 ->添加角色分配 ”。
- 在 “分配类型 ”页上,选择“ 下一步”。
- 在“ 角色 ”页上,选择 “架构注册表读取者”(预览版),然后选择页面底部的“ 下一步 ”。
- 使用 “+ 选择成员 ”链接将你在上一步中创建的应用程序添加到
example-app角色,然后选择“ 下一步”。 - 在“查看 + 分配”页面上,选择“查看 + 分配”。
更新 Kafka 应用程序的客户端应用程序配置
需要使用与我们创建的 Microsoft Entra 应用程序以及架构注册表信息相关的配置来更新 Kafka 生产者和消费者应用程序的客户端配置。
若要更新 Kafka 生成者配置,请导航到 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer。
按照适用于事件中心的 Kafka 快速入门指南更新 src/main/resources/app.properties 中 Kafka 应用程序的配置。
使用架构注册表相关的配置和您在上面创建的 Microsoft Entra 应用程序,更新 src/main/resources/app.properties 中生成者的配置详细信息,如下所示:
schema.group=contoso-sg schema.registry.url=https://<NAMESPACENAME>.servicebus.chinacloudapi.cn tenant.id=<> client.id=<> client.secret=<>请按照相同的说明并更新 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer 配置。
对于 Kafka 生成者和使用者应用程序,使用以下 Avro 架构:
{ "namespace": "com.azure.schemaregistry.samples", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
将 Kafka 生成者与 Avro 架构验证配合使用
若要运行 Kafka 生成者应用程序,请导航到 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer。
可以运行生成者应用程序,以便它可以生成特定于 Avro 的记录或通用记录。 对于特定记录模式,您首先需要使用以下 Maven 命令针对生产者架构生成类。
mvn generate-sources然后,可以使用以下命令运行生成者应用程序。
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"成功执行生成者应用程序后,它会提示你选择生成者方案。 对于本快速入门,可以选择选项 1 - 生成 Avro SpecificRecords。
Enter case number: 1 - produce Avro SpecificRecords 2 - produce Avro GenericRecords成功进行数据序列化和发布后,应在生成者应用程序中看到以下控制台日志:
INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"} INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"} INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
使用带有 Avro 模式验证的 Kafka 消费者
若要运行 Kafka 使用者应用程序,请导航到 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer。
可以运行使用者应用程序,以便它可以使用 Avro 特定记录或通用记录。 对于特定记录模式,您首先需要使用以下 Maven 命令针对生产者架构生成类。
mvn generate-sources然后,可以使用以下命令运行使用者应用程序。
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"成功执行使用者应用程序后,它会提示你选择生成者方案。 对于本快速入门,可以选择选项 1 - 使用 Avro SpecificRecords。
Enter case number: 1 - consume Avro SpecificRecords 2 - consume Avro GenericRecords成功使用和反序列化数据后,应在生成者应用程序中看到以下控制台日志:
INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"} INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"} INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
清理资源
删除事件中心命名空间或删除包含命名空间的资源组。