在本文中,你将在 AKS 上部署 Strimzi 群集作员和高度可用的 Kafka 群集。
注释
如果尚未为此部署创建所需的基础结构,请按照在 Azure Kubernetes 服务(AKS)上部署 Kafka 的准备步骤进行设置,然后返回本文。
Strimzi 部署
Strimzi 集群运维员被部署于其自身的命名空间strimzi-operator
,并配置为观察部署 Kafka 集群组件的kafka
命名空间。 为了确保高可用性,操作员使用:
- 具有领导选举的多个副本:一个副本充当管理已部署资源的主动领导者,而另一个副本则保持待命状态。 如果领导失败,备用副本将接管。
- 区域分发:三个副本(每个可用性区域一个)提供针对区域性中断的复原能力。 Pod 反关联规则可防止在同一区域中调度多个实例。
- Pod 中断预算:由操作器部署自动创建,以确保在自愿中断期间至少有一个副本保持可用。
此体系结构可确保 Strimzi 群集作员即使在基础结构维护或部分中断期间仍保持高可用性。
使用 Helm 安装 Strimzi 群集作员
使用
kubectl create namespace
命令为 Strimzi 群集作员和 Kafka 群集创建命名空间。kubectl create namespace strimzi-operator kubectl create namespace kafka
使用以下脚本创建一个
values.yaml
文件,以提供 Helm 图表的特定配置:cat <<EOF > values.yaml replicas: 3 watchNamespaces: - kafka leaderElection: enabled: true podDisruptionBudget: enabled: true affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: name operator: In values: - strimzi-cluster-operator topologyKey: topology.kubernetes.io/zone EOF
使用
helm install
命令安装 Strimzi 集群操作员。helm install strimzi-cluster-operator oci://quay.io/strimzi-helm/strimzi-kafka-operator \ --namespace strimzi-operator \ --values values.yaml
验证 Strimzi 群集操作员是否已成功部署,并使用
kubectl get
命令来验证所有 Pod 是否处于运行状态。kubectl get pods -n strimzi-operator
输出应类似于以下示例输出:
NAME READY STATUS RESTARTS AGE strimzi-cluster-operator-6f7588bb79-bvfdp 1/1 Running 0 1d22h strimzi-cluster-operator-6f7588bb79-lfcp6 1/1 Running 0 1d22h strimzi-cluster-operator-6f7588bb79-qdlm8 1/1 Running 0 1d22h
使用 Helm 安装 Strimzi 清空清理器
Strimzi 排空清理器通过拦截代理 Pod 的排空请求来确保 Kubernetes 节点的流畅排空。 这可以防止 Kafka 分区副本变得复制不足,从而维护 Kafka 群集运行状况和可靠性。
若要实现高可用性,应跨可用性区域部署具有多个副本的清空清理器,并使用 Pod 中断预算对其进行配置,确保它在区域性中断或群集升级期间保持正常运行。
Helm 图表可用于安装 Strimzi 清空清洁器:
使用
kubectl create namespace
命令为清空清理器创建命名空间。kubectl create namespace strimzi-drain-cleaner
使用以下脚本创建一个
values.yaml
文件来覆盖 Helm 图表的特定配置。cat <<EOF > values.yaml replicaCount: 3 namespace: create: false podDisruptionBudget: create: true affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - strimzi-drain-cleaner topologyKey: topology.kubernetes.io/zone EOF
使用
helm install
命令安装 Strimzi 清空清理器。helm install strimzi-drain-cleaner oci://quay.io/strimzi-helm/strimzi-drain-cleaner \ --namespace strimzi-drain-cleaner \ --values values.yaml
验证 Strimzi Drain Cleaner 是否已成功部署,并使用
kubectl get
命令检查所有 pod 是否处于运行状态。kubectl get pods -n strimzi-drain-cleaner
输出应类似于以下示例输出:
NAME READY STATUS RESTARTS AGE strimzi-drain-cleaner-6d694bd55b-dshkp 1/1 Running 0 1d22h strimzi-drain-cleaner-6d694bd55b-l8cbf 1/1 Running 0 1d22h strimzi-drain-cleaner-6d694bd55b-wj6xx 1/1 Running 0 1d22h
Kafka 群集体系结构和注意事项
Strimzi 群集作员使用自定义资源定义在 AKS 上启用声明性 Kafka 部署。 从 Strimzi 0.46 开始,Kafka 群集直接在 Kafka 中使用 KRaft 而不是 ZooKeeper。
Strimzi 使用 KafkaNodePool 自定义资源,为每个池分配一个特定角色(代理、控制器或两者):
- Kafka 中转站 处理消息的处理和存储。
- Kafka 控制器 使用 Raft 共识协议管理 Kafka 元数据。
为了获得高可用性,我们的目标体系结构由以下方法定义:
- 为代理节点和控制器单独设置 KafkaNodePools,每个节点池有三个副本。
- 跨可用性区域和节点分布 Pod 的拓扑分布约束。
- 使用特定节点池优化资源利用率的节点关联规则。
- Azure 容器存储中的持久卷,其中包含用于代理消息和元数据的单独卷。
此体系结构可提高可伸缩性和容错性,同时允许独立缩放中转站和控制器以满足工作负荷要求。
生产 Kafka 群集的 JVM 配置
优化 Java 虚拟机(JVM)对于最佳 Kafka 中转站和控制器性能至关重要,尤其是在生产环境中。 正确配置的 JVM 设置有助于最大程度地提高吞吐量、最大程度地减少延迟,并确保每个代理的负载过大时保持稳定性。
LinkedIn,Kafka 的创建者共享了在 Java 上运行 Kafka 的典型参数,这些参数适用于LinkedIn最繁忙的群集之一: Apache Kafka Java 配置。 我们将此配置用作 Kafka 中转站配置的基线。 可以进行更改以满足特定的工作负载要求。
jvmOptions:
# Sets initial and maximum heap size to 6GB - critical for memory-intensive Kafka operations
# Equal sizing prevents resizing pauses
"-Xms": "6g"
"-Xmx": "6g"
"-XX":
# Initial metaspace size (class metadata storage area) at 96MB
"MetaspaceSize": "96m"
# Enables the Garbage-First (G1) garbage collector, optimized for better predictability and lower pause times
"UseG1GC": "true"
# Targets maximum GC pause time of 20ms - keeps latency predictable
"MaxGCPauseMillis": "20"
# Starts concurrent GC cycle when heap is 35% full - balances CPU overhead and frequency
"InitiatingHeapOccupancyPercent": "35"
# Sets G1 heap region size to 16MB - affects collection efficiency and pause times
"G1HeapRegionSize": "16M"
# Keeps at least 50% free space after metaspace GC - prevents frequent resizing
"MinMetaspaceFreeRatio": "50"
# Limits expansion to allow up to 80% free space in metaspace after GC
"MaxMetaspaceFreeRatio": "80"
# Makes explicit System.gc() calls run concurrently instead of stopping all threads
"ExplicitGCInvokesConcurrent": "true"
部署 Kafka 节点池
在本部分中,我们将创建两个 Kafka 节点池:一个用于代理,一个用于控制器。
应用 YAML 清单以使用
kubectl apply
命令创建两个 Kafka 节点池。kubectl apply -n kafka -f - <<EOF --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaNodePool metadata: name: controller labels: strimzi.io/cluster: kafka-aks-cluster spec: replicas: 3 roles: - controller resources: requests: memory: 4Gi limits: memory: 6Gi template: pod: metadata: labels: kafkaRole: controller affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: app operator: In values: - kafka podAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 100 podAffinityTerm: labelSelector: matchLabels: kafkaRole: broker topologyKey: kubernetes.io/hostname topologySpreadConstraints: - labelSelector: matchLabels: kafkaRole: controller maxSkew: 1 topologyKey: topology.kubernetes.io/zone whenUnsatisfiable: ScheduleAnyway - labelSelector: matchLabels: kafkaRole: controller maxSkew: 1 topologyKey: kubernetes.io/hostname whenUnsatisfiable: ScheduleAnyway storage: type: jbod volumes: - id: 0 type: persistent-claim size: 25Gi kraftMetadata: shared deleteClaim: false class: acstor-azuredisk-zr jvmOptions: "-Xms": "3g" "-Xmx": "3g" "-XX": "MetaspaceSize": "96m" "UseG1GC": "true" "MaxGCPauseMillis": "20" "InitiatingHeapOccupancyPercent": "35" "G1HeapRegionSize": "16M" "MinMetaspaceFreeRatio": "50" "MaxMetaspaceFreeRatio": "80" "ExplicitGCInvokesConcurrent": "true" --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaNodePool metadata: name: broker labels: strimzi.io/cluster: kafka-aks-cluster spec: replicas: 3 roles: - broker resources: requests: memory: 8Gi limits: memory: 10Gi template: pod: metadata: labels: kafkaRole: broker affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: app operator: In values: - kafka podAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 100 podAffinityTerm: labelSelector: matchLabels: kafkaRole: controller topologyKey: kubernetes.io/hostname topologySpreadConstraints: - labelSelector: matchLabels: kafkaRole: broker maxSkew: 1 topologyKey: topology.kubernetes.io/zone whenUnsatisfiable: ScheduleAnyway - labelSelector: matchLabels: kafkaRole: broker maxSkew: 1 topologyKey: kubernetes.io/hostname whenUnsatisfiable: ScheduleAnyway storage: type: jbod volumes: - id: 0 type: persistent-claim size: 50Gi deleteClaim: false class: acstor-azuredisk-zr - id: 1 type: persistent-claim size: 25Gi kraftMetadata: shared deleteClaim: false class: acstor-azuredisk-zr jvmOptions: "-Xms": "6g" "-Xmx": "6g" "-XX": "MetaspaceSize": "96m" "UseG1GC": "true" "MaxGCPauseMillis": "20" "InitiatingHeapOccupancyPercent": "35" "G1HeapRegionSize": "16M" "MinMetaspaceFreeRatio": "50" "MaxMetaspaceFreeRatio": "80" "ExplicitGCInvokesConcurrent": "true" EOF
创建 Kafka 节点池后,下一步是定义将这些池绑定到正常运行的 Kafka 生态系统的 Kafka 群集自定义资源。 此体系结构遵循关注点模式的分离,其中 Kafka 节点池管理基础结构方面,而 Kafka 群集资源处理应用程序级配置。
部署 Kafka 群集
在创建 Kafka 群集之前,请使用命令创建包含 JMX Prometheus 导出程序配置的
kubectl apply
ConfigMap。 此 ConfigMap 定义了 Kafka 的内部 JMX 指标如何转换和公开为 Prometheus 格式,以实现对 Kafka 生态系统的全面监控。 在此配置中定义的模式,将 JMX 指标路径映射为具有适当类型和标签的、格式正确的 Prometheus 指标。kubectl apply -n kafka -f - <<'EOF' --- apiVersion: v1 kind: ConfigMap metadata: name: kafka-metrics labels: app: strimzi data: kafka-metrics-config.yaml: | # See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics lowercaseOutputName: true rules: # Special cases and very specific rules - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value name: kafka_server_$1_$2 type: GAUGE labels: clientId: "$3" topic: "$4" partition: "$5" - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value name: kafka_server_$1_$2 type: GAUGE labels: clientId: "$3" broker: "$4:$5" - pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections name: kafka_server_$1_connections_tls_info type: GAUGE labels: cipher: "$2" protocol: "$3" listener: "$4" networkProcessor: "$5" - pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections name: kafka_server_$1_connections_software type: GAUGE labels: clientSoftwareName: "$2" clientSoftwareVersion: "$3" listener: "$4" networkProcessor: "$5" - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+-total):" name: kafka_server_$1_$4 type: COUNTER labels: listener: "$2" networkProcessor: "$3" - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):" name: kafka_server_$1_$4 type: GAUGE labels: listener: "$2" networkProcessor: "$3" - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+-total) name: kafka_server_$1_$4 type: COUNTER labels: listener: "$2" networkProcessor: "$3" - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+) name: kafka_server_$1_$4 type: GAUGE labels: listener: "$2" networkProcessor: "$3" # Some percent metrics use MeanRate attribute # Ex) kafka.server<type=(KafkaRequestHandlerPool), name=(RequestHandlerAvgIdlePercent)><>MeanRate - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate name: kafka_$1_$2_$3_percent type: GAUGE # Generic gauges for percents - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value name: kafka_$1_$2_$3_percent type: GAUGE - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value name: kafka_$1_$2_$3_percent type: GAUGE labels: "$4": "$5" # Generic per-second counters with 0-2 key/value pairs - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count name: kafka_$1_$2_$3_total type: COUNTER labels: "$4": "$5" "$6": "$7" - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count name: kafka_$1_$2_$3_total type: COUNTER labels: "$4": "$5" - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count name: kafka_$1_$2_$3_total type: COUNTER # Generic gauges with 0-2 key/value pairs - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value name: kafka_$1_$2_$3 type: GAUGE labels: "$4": "$5" "$6": "$7" - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value name: kafka_$1_$2_$3 type: GAUGE labels: "$4": "$5" - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value name: kafka_$1_$2_$3 type: GAUGE # Emulate Prometheus 'Summary' metrics for the exported 'Histogram's. # Note that these are missing the '_sum' metric! - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count name: kafka_$1_$2_$3_count type: COUNTER labels: "$4": "$5" "$6": "$7" - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile name: kafka_$1_$2_$3 type: GAUGE labels: "$4": "$5" "$6": "$7" quantile: "0.$8" - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count name: kafka_$1_$2_$3_count type: COUNTER labels: "$4": "$5" - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile name: kafka_$1_$2_$3 type: GAUGE labels: "$4": "$5" quantile: "0.$6" - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count name: kafka_$1_$2_$3_count type: COUNTER - pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile name: kafka_$1_$2_$3 type: GAUGE labels: quantile: "0.$4" # KRaft overall related metrics # distinguish between always increasing COUNTER (total and max) and variable GAUGE (all others) metrics - pattern: "kafka.server<type=raft-metrics><>(.+-total|.+-max):" name: kafka_server_raftmetrics_$1 type: COUNTER - pattern: "kafka.server<type=raft-metrics><>(current-state): (.+)" name: kafka_server_raftmetrics_$1 value: 1 type: UNTYPED labels: $1: "$2" - pattern: "kafka.server<type=raft-metrics><>(.+):" name: kafka_server_raftmetrics_$1 type: GAUGE # KRaft "low level" channels related metrics # distinguish between always increasing COUNTER (total and max) and variable GAUGE (all others) metrics - pattern: "kafka.server<type=raft-channel-metrics><>(.+-total|.+-max):" name: kafka_server_raftchannelmetrics_$1 type: COUNTER - pattern: "kafka.server<type=raft-channel-metrics><>(.+):" name: kafka_server_raftchannelmetrics_$1 type: GAUGE # Broker metrics related to fetching metadata topic records in KRaft mode - pattern: "kafka.server<type=broker-metadata-metrics><>(.+):" name: kafka_server_brokermetadatametrics_$1 type: GAUGE --- apiVersion: v1 kind: ConfigMap metadata: name: cruise-control-metrics labels: app: strimzi data: metrics-config.yaml: | # See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics lowercaseOutputName: true rules: - pattern: kafka.cruisecontrol<name=(.+)><>(\w+) name: kafka_cruisecontrol_$1_$2 type: GAUGE EOF
使用
kubectl apply
命令部署 Kafka 群集资源,将之前创建的节点池连接起来,形成一个完整的 Kafka 生态系统。 此自定义资源配置多个关键组件:- Kafka 核心配置:定义复制因素、侦听器设置和其他特定于 Kafka 的参数。
- 巡航控制:提供自动化群集均衡和监视功能。
- 实体操作器:部署主题操作器和用户操作器,这些操作器通过 Kubernetes 资源以声明式管理 Kafka 主题和用户。
- JMX 指标:使用之前定义的 ConfigMap 配置指标暴露。
kubectl apply -n kafka -f - <<EOF --- apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: kafka-aks-cluster annotations: strimzi.io/node-pools: enabled strimzi.io/kraft: enabled spec: kafka: version: 3.9.0 metadataVersion: 3.9-IV0 rack: topologyKey: topology.kubernetes.io/zone template: podDisruptionBudget: maxUnavailable: 2 listeners: - name: internal port: 9092 type: internal tls: true config: offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 default.replication.factor: 3 min.insync.replicas: 2 log.segment.bytes: 1073741824 log.retention.hours: 168 log.retention.check.interval.ms: 300000 metricsConfig: type: jmxPrometheusExporter valueFrom: configMapKeyRef: name: kafka-metrics key: kafka-metrics-config.yaml cruiseControl: metricsConfig: type: jmxPrometheusExporter valueFrom: configMapKeyRef: name: cruise-control-metrics key: metrics-config.yaml entityOperator: topicOperator: {} userOperator: {} EOF
部署后,通过检查是否已使用
kubectl get
命令创建并处于运行状态的所有 KafkaNodePools、Kafka 群集资源及其相应的 Pod 来验证 Kafka 部署。kubectl get pods,kafkanodepool,kafka -n kafka
输出应类似于以下示例输出:
NAME READY STATUS RESTARTS AGE pod/kafka-aks-cluster-broker-0 1/1 Running 0 1d22h pod/kafka-aks-cluster-broker-1 1/1 Running 0 1d22h pod/kafka-aks-cluster-broker-2 1/1 Running 0 1d22h pod/kafka-aks-cluster-controller-3 1/1 Running 0 1d22h pod/kafka-aks-cluster-controller-4 1/1 Running 0 1d22h pod/kafka-aks-cluster-controller-5 1/1 Running 0 1d22h pod/kafka-aks-cluster-cruise-control-844b69848-87rf6 1/1 Running 0 1d22h pod/kafka-aks-cluster-entity-operator-6f949f6774-t8wql 2/2 Running 0 1d22h NAME DESIRED REPLICAS ROLES NODEIDS kafkanodepool.kafka.strimzi.io/broker 3 ["broker"] [0,1,2] kafkanodepool.kafka.strimzi.io/controller 3 ["controller"] [3,4,5] NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY METADATA STATE WARNINGS kafka.kafka.strimzi.io/kafka-aks-cluster
创建 Kafka 用户和主题
使用 Kafka 群集自定义资源部署的 Strimzi 实体作员将 Kubernetes 自定义资源(KafkaTopic
和 KafkaUser
)转换为实际的 Kafka 资源,从而启用 GitOps 工作流和一致的配置管理。
注释
使用实体作员以声明方式创建 Kafka 主题和用户是可选的。 还可以使用传统的 Kafka CLI 工具或 API 创建它们。 但是,声明性方法提供版本控制、审核线索和跨环境一致的管理等优势。
使用
kubectl apply
命令创建具有主题作员的 Kafka 主题。kubectl apply -n kafka -f - << EOF apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: test-topic labels: strimzi.io/cluster: kafka-aks-cluster spec: replicas: 3 partitions: 4 config: retention.ms: 7200000 segment.bytes: 1073741824 EOF
使用
kubectl get
命令验证 Kafka 主题是否已成功创建。kubectl get kafkatopic -n kafka
输出应类似于以下示例输出:
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY test-topic kafka-aks-cluster 4 3 True
有关详细信息,请参阅 使用主题作员管理 Kafka 主题。
使用
kubectl apply
命令创建具有用户作员的 Kafka 用户。kubectl apply -f - <<EOF apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: test-user labels: strimzi.io/cluster: kafka-aks-cluster spec: authentication: type: tls authorization: type: simple acls: - resource: type: topic name: test-topic patternType: literal operations: - Describe - Read host: "*" - resource: type: group name: test-group patternType: literal operations: - Read host: "*" - resource: type: topic name: test-topic patternType: literal operations: - Create - Describe - Write host: "*" EOF
有关详细信息,请参阅 使用用户作员管理 Kafka 用户。
后续步骤
供稿人
Microsoft维护本文。 以下贡献者最初撰写了这篇文章:
- 塞尔吉奥·纳瓦尔 |高级客户工程师
- Erin Schaffer | 内容开发人员 2