在 Azure Kubernetes 服务(AKS)上配置和部署 Strimzi 和 Kafka 组件

在本文中,你将在 AKS 上部署 Strimzi 群集作员和高度可用的 Kafka 群集。

注释

如果尚未为此部署创建所需的基础结构,请按照在 Azure Kubernetes 服务(AKS)上部署 Kafka 的准备步骤进行设置,然后返回本文。

Strimzi 部署

Strimzi 集群运维员被部署于其自身的命名空间strimzi-operator,并配置为观察部署 Kafka 集群组件的kafka命名空间。 为了确保高可用性,操作员使用:

  • 具有领导选举的多个副本:一个副本充当管理已部署资源的主动领导者,而另一个副本则保持待命状态。 如果领导失败,备用副本将接管。
  • 区域分发:三个副本(每个可用性区域一个)提供针对区域性中断的复原能力。 Pod 反关联规则可防止在同一区域中调度多个实例。
  • Pod 中断预算:由操作器部署自动创建,以确保在自愿中断期间至少有一个副本保持可用。

此体系结构可确保 Strimzi 群集作员即使在基础结构维护或部分中断期间仍保持高可用性。

使用 Helm 安装 Strimzi 群集作员

  1. 使用 kubectl create namespace 命令为 Strimzi 群集作员和 Kafka 群集创建命名空间。

    kubectl create namespace strimzi-operator  
    kubectl create namespace kafka  
    
  2. 使用以下脚本创建一个 values.yaml 文件,以提供 Helm 图表的特定配置:

    cat <<EOF > values.yaml  
    replicas: 3  
    watchNamespaces: 
      - kafka  
    leaderElection:  
      enabled: true  
    podDisruptionBudget:  
      enabled: true  
    affinity:  
      podAntiAffinity:  
        requiredDuringSchedulingIgnoredDuringExecution:  
          - labelSelector:  
              matchExpressions:  
                - key: name  
                  operator: In  
                  values:  
                    - strimzi-cluster-operator  
            topologyKey: topology.kubernetes.io/zone  
    EOF  
    
  3. 使用 helm install 命令安装 Strimzi 集群操作员。

    helm install strimzi-cluster-operator oci://quay.io/strimzi-helm/strimzi-kafka-operator \
    --namespace strimzi-operator \
    --values values.yaml
    
  4. 验证 Strimzi 群集操作员是否已成功部署,并使用kubectl get 命令来验证所有 Pod 是否处于运行状态。

    kubectl get pods -n strimzi-operator  
    

    输出应类似于以下示例输出:

    NAME                                        READY   STATUS    RESTARTS      AGE  
    strimzi-cluster-operator-6f7588bb79-bvfdp   1/1     Running   0             1d22h  
    strimzi-cluster-operator-6f7588bb79-lfcp6   1/1     Running   0             1d22h  
    strimzi-cluster-operator-6f7588bb79-qdlm8   1/1     Running   0             1d22h  
    

使用 Helm 安装 Strimzi 清空清理器

Strimzi 排空清理器通过拦截代理 Pod 的排空请求来确保 Kubernetes 节点的流畅排空。 这可以防止 Kafka 分区副本变得复制不足,从而维护 Kafka 群集运行状况和可靠性。

若要实现高可用性,应跨可用性区域部署具有多个副本的清空清理器,并使用 Pod 中断预算对其进行配置,确保它在区域性中断或群集升级期间保持正常运行。

Helm 图表可用于安装 Strimzi 清空清洁器:

  1. 使用 kubectl create namespace 命令为清空清理器创建命名空间。

    kubectl create namespace strimzi-drain-cleaner  
    
  2. 使用以下脚本创建一个values.yaml文件来覆盖 Helm 图表的特定配置。

    cat <<EOF > values.yaml  
    replicaCount: 3  
    namespace:  
      create: false  
    podDisruptionBudget:  
      create: true  
    affinity:  
      podAntiAffinity:  
        requiredDuringSchedulingIgnoredDuringExecution:  
          - labelSelector:  
              matchExpressions:  
                - key: app  
                  operator: In  
                  values:  
                    - strimzi-drain-cleaner  
            topologyKey: topology.kubernetes.io/zone  
    EOF  
    
  3. 使用 helm install 命令安装 Strimzi 清空清理器。

    helm install strimzi-drain-cleaner oci://quay.io/strimzi-helm/strimzi-drain-cleaner \
    --namespace strimzi-drain-cleaner \
    --values values.yaml
    
  4. 验证 Strimzi Drain Cleaner 是否已成功部署,并使用 kubectl get 命令检查所有 pod 是否处于运行状态。

    kubectl get pods -n strimzi-drain-cleaner  
    

    输出应类似于以下示例输出:

    NAME                                     READY   STATUS    RESTARTS   AGE  
    strimzi-drain-cleaner-6d694bd55b-dshkp   1/1     Running   0          1d22h  
    strimzi-drain-cleaner-6d694bd55b-l8cbf   1/1     Running   0          1d22h  
    strimzi-drain-cleaner-6d694bd55b-wj6xx   1/1     Running   0          1d22h  
    

Kafka 群集体系结构和注意事项

Strimzi 群集作员使用自定义资源定义在 AKS 上启用声明性 Kafka 部署。 从 Strimzi 0.46 开始,Kafka 群集直接在 Kafka 中使用 KRaft 而不是 ZooKeeper。

Strimzi 使用 KafkaNodePool 自定义资源,为每个池分配一个特定角色(代理、控制器或两者):

  • Kafka 中转站 处理消息的处理和存储。
  • Kafka 控制器 使用 Raft 共识协议管理 Kafka 元数据。

为了获得高可用性,我们的目标体系结构由以下方法定义:

  • 为代理节点和控制器单独设置 KafkaNodePools,每个节点池有三个副本。
  • 跨可用性区域和节点分布 Pod 的拓扑分布约束。
  • 使用特定节点池优化资源利用率的节点关联规则。
  • Azure 容器存储中的持久卷,其中包含用于代理消息和元数据的单独卷。

此体系结构可提高可伸缩性和容错性,同时允许独立缩放中转站和控制器以满足工作负荷要求。

生产 Kafka 群集的 JVM 配置

优化 Java 虚拟机(JVM)对于最佳 Kafka 中转站和控制器性能至关重要,尤其是在生产环境中。 正确配置的 JVM 设置有助于最大程度地提高吞吐量、最大程度地减少延迟,并确保每个代理的负载过大时保持稳定性。

LinkedIn,Kafka 的创建者共享了在 Java 上运行 Kafka 的典型参数,这些参数适用于LinkedIn最繁忙的群集之一: Apache Kafka Java 配置。 我们将此配置用作 Kafka 中转站配置的基线。 可以进行更改以满足特定的工作负载要求。

jvmOptions:
  # Sets initial and maximum heap size to 6GB - critical for memory-intensive Kafka operations
  # Equal sizing prevents resizing pauses
  "-Xms": "6g"
  "-Xmx": "6g"
  "-XX":
    # Initial metaspace size (class metadata storage area) at 96MB
    "MetaspaceSize": "96m"
    
    # Enables the Garbage-First (G1) garbage collector, optimized for better predictability and lower pause times
    "UseG1GC": "true"
    
    # Targets maximum GC pause time of 20ms - keeps latency predictable
    "MaxGCPauseMillis": "20"
    
    # Starts concurrent GC cycle when heap is 35% full - balances CPU overhead and frequency
    "InitiatingHeapOccupancyPercent": "35"
    
    # Sets G1 heap region size to 16MB - affects collection efficiency and pause times
    "G1HeapRegionSize": "16M"
    
    # Keeps at least 50% free space after metaspace GC - prevents frequent resizing
    "MinMetaspaceFreeRatio": "50"
    
    # Limits expansion to allow up to 80% free space in metaspace after GC
    "MaxMetaspaceFreeRatio": "80"
    
    # Makes explicit System.gc() calls run concurrently instead of stopping all threads
    "ExplicitGCInvokesConcurrent": "true"

部署 Kafka 节点池

在本部分中,我们将创建两个 Kafka 节点池:一个用于代理,一个用于控制器。

  • 应用 YAML 清单以使用 kubectl apply 命令创建两个 Kafka 节点池。

    kubectl apply -n kafka -f - <<EOF
    ---
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: controller
      labels:
        strimzi.io/cluster: kafka-aks-cluster
    spec:
      replicas: 3
      roles:
        - controller
      resources:
        requests:
          memory: 4Gi
        limits:
          memory: 6Gi
      template:
        pod:
          metadata:
            labels:
              kafkaRole: controller
          affinity:
            nodeAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                nodeSelectorTerms:
                - matchExpressions:
                  - key: app
                    operator: In
                    values:
                    - kafka
            podAffinity:
              preferredDuringSchedulingIgnoredDuringExecution:
                - weight: 100
                  podAffinityTerm:
                    labelSelector:
                      matchLabels:
                        kafkaRole: broker
                    topologyKey: kubernetes.io/hostname
          topologySpreadConstraints:
            - labelSelector:
                matchLabels:
                  kafkaRole: controller
              maxSkew: 1
              topologyKey: topology.kubernetes.io/zone
              whenUnsatisfiable: ScheduleAnyway
            - labelSelector:
                matchLabels:
                  kafkaRole: controller
              maxSkew: 1
              topologyKey: kubernetes.io/hostname
              whenUnsatisfiable: ScheduleAnyway
      storage:
        type: jbod
        volumes:
          - id: 0
            type: persistent-claim
            size: 25Gi
            kraftMetadata: shared
            deleteClaim: false
            class: acstor-azuredisk-zr
      jvmOptions:
        "-Xms": "3g"
        "-Xmx": "3g"
        "-XX":
          "MetaspaceSize": "96m"
          "UseG1GC": "true"
          "MaxGCPauseMillis": "20"
          "InitiatingHeapOccupancyPercent": "35"
          "G1HeapRegionSize": "16M"
          "MinMetaspaceFreeRatio": "50"
          "MaxMetaspaceFreeRatio": "80"
          "ExplicitGCInvokesConcurrent": "true" 
    ---
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: broker
      labels:
        strimzi.io/cluster: kafka-aks-cluster
    spec:
      replicas: 3
      roles:
        - broker
      resources:
        requests:
          memory: 8Gi
        limits:
          memory: 10Gi
      template:
        pod:
          metadata:
            labels:
              kafkaRole: broker
          affinity:
            nodeAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                nodeSelectorTerms:
                - matchExpressions:
                  - key: app
                    operator: In
                    values:
                    - kafka
            podAffinity:
              preferredDuringSchedulingIgnoredDuringExecution:
                - weight: 100
                  podAffinityTerm:
                    labelSelector:
                      matchLabels:
                        kafkaRole: controller
                    topologyKey: kubernetes.io/hostname
          topologySpreadConstraints:
            - labelSelector:
                matchLabels:
                  kafkaRole: broker
              maxSkew: 1
              topologyKey: topology.kubernetes.io/zone
              whenUnsatisfiable: ScheduleAnyway
            - labelSelector:
                matchLabels:
                  kafkaRole: broker
              maxSkew: 1
              topologyKey: kubernetes.io/hostname
              whenUnsatisfiable: ScheduleAnyway 
      storage:
        type: jbod
        volumes:
          - id: 0
            type: persistent-claim
            size: 50Gi
            deleteClaim: false
            class: acstor-azuredisk-zr
          - id: 1
            type: persistent-claim
            size: 25Gi
            kraftMetadata: shared
            deleteClaim: false
            class: acstor-azuredisk-zr
      jvmOptions:
        "-Xms": "6g"
        "-Xmx": "6g"
        "-XX":
          "MetaspaceSize": "96m"
          "UseG1GC": "true"
          "MaxGCPauseMillis": "20"
          "InitiatingHeapOccupancyPercent": "35"
          "G1HeapRegionSize": "16M"
          "MinMetaspaceFreeRatio": "50"
          "MaxMetaspaceFreeRatio": "80"
          "ExplicitGCInvokesConcurrent": "true"
    EOF
    

创建 Kafka 节点池后,下一步是定义将这些池绑定到正常运行的 Kafka 生态系统的 Kafka 群集自定义资源。 此体系结构遵循关注点模式的分离,其中 Kafka 节点池管理基础结构方面,而 Kafka 群集资源处理应用程序级配置。

部署 Kafka 群集

  1. 在创建 Kafka 群集之前,请使用命令创建包含 JMX Prometheus 导出程序配置的 kubectl apply ConfigMap。 此 ConfigMap 定义了 Kafka 的内部 JMX 指标如何转换和公开为 Prometheus 格式,以实现对 Kafka 生态系统的全面监控。 在此配置中定义的模式,将 JMX 指标路径映射为具有适当类型和标签的、格式正确的 Prometheus 指标。

    kubectl apply -n kafka -f - <<'EOF'
    ---
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-metrics
      labels:
        app: strimzi
    data:
      kafka-metrics-config.yaml: |
        # See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics
        lowercaseOutputName: true
        rules:
        # Special cases and very specific rules
        - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
          name: kafka_server_$1_$2
          type: GAUGE
          labels:
            clientId: "$3"
            topic: "$4"
            partition: "$5"
        - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
          name: kafka_server_$1_$2
          type: GAUGE
          labels:
            clientId: "$3"
            broker: "$4:$5"
        - pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections
          name: kafka_server_$1_connections_tls_info
          type: GAUGE
          labels:
            cipher: "$2"
            protocol: "$3"
            listener: "$4"
            networkProcessor: "$5"
        - pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections
          name: kafka_server_$1_connections_software
          type: GAUGE
          labels:
            clientSoftwareName: "$2"
            clientSoftwareVersion: "$3"
            listener: "$4"
            networkProcessor: "$5"
        - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+-total):"
          name: kafka_server_$1_$4
          type: COUNTER
          labels:
            listener: "$2"
            networkProcessor: "$3"
        - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):"
          name: kafka_server_$1_$4
          type: GAUGE
          labels:
            listener: "$2"
            networkProcessor: "$3"
        - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+-total)
          name: kafka_server_$1_$4
          type: COUNTER
          labels:
            listener: "$2"
            networkProcessor: "$3"
        - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+)
          name: kafka_server_$1_$4
          type: GAUGE
          labels:
            listener: "$2"
            networkProcessor: "$3"
        # Some percent metrics use MeanRate attribute
        # Ex) kafka.server<type=(KafkaRequestHandlerPool), name=(RequestHandlerAvgIdlePercent)><>MeanRate
        - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate
          name: kafka_$1_$2_$3_percent
          type: GAUGE
        # Generic gauges for percents
        - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value
          name: kafka_$1_$2_$3_percent
          type: GAUGE
        - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value
          name: kafka_$1_$2_$3_percent
          type: GAUGE
          labels:
            "$4": "$5"
        # Generic per-second counters with 0-2 key/value pairs
        - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
          name: kafka_$1_$2_$3_total
          type: COUNTER
          labels:
            "$4": "$5"
            "$6": "$7"
        - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
          name: kafka_$1_$2_$3_total
          type: COUNTER
          labels:
            "$4": "$5"
        - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
          name: kafka_$1_$2_$3_total
          type: COUNTER
        # Generic gauges with 0-2 key/value pairs
        - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
          name: kafka_$1_$2_$3
          type: GAUGE
          labels:
            "$4": "$5"
            "$6": "$7"
        - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
          name: kafka_$1_$2_$3
          type: GAUGE
          labels:
            "$4": "$5"
        - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
          name: kafka_$1_$2_$3
          type: GAUGE
        # Emulate Prometheus 'Summary' metrics for the exported 'Histogram's.
        # Note that these are missing the '_sum' metric!
        - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
          name: kafka_$1_$2_$3_count
          type: COUNTER
          labels:
            "$4": "$5"
            "$6": "$7"
        - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
          name: kafka_$1_$2_$3
          type: GAUGE
          labels:
            "$4": "$5"
            "$6": "$7"
            quantile: "0.$8"
        - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
          name: kafka_$1_$2_$3_count
          type: COUNTER
          labels:
            "$4": "$5"
        - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
          name: kafka_$1_$2_$3
          type: GAUGE
          labels:
            "$4": "$5"
            quantile: "0.$6"
        - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
          name: kafka_$1_$2_$3_count
          type: COUNTER
        - pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
          name: kafka_$1_$2_$3
          type: GAUGE
          labels:
            quantile: "0.$4"
        # KRaft overall related metrics
        # distinguish between always increasing COUNTER (total and max) and variable GAUGE (all others) metrics
        - pattern: "kafka.server<type=raft-metrics><>(.+-total|.+-max):"
          name: kafka_server_raftmetrics_$1
          type: COUNTER
        - pattern: "kafka.server<type=raft-metrics><>(current-state): (.+)"
          name: kafka_server_raftmetrics_$1
          value: 1
          type: UNTYPED
          labels:
            $1: "$2"
        - pattern: "kafka.server<type=raft-metrics><>(.+):"
          name: kafka_server_raftmetrics_$1
          type: GAUGE
        # KRaft "low level" channels related metrics
        # distinguish between always increasing COUNTER (total and max) and variable GAUGE (all others) metrics
        - pattern: "kafka.server<type=raft-channel-metrics><>(.+-total|.+-max):"
          name: kafka_server_raftchannelmetrics_$1
          type: COUNTER
        - pattern: "kafka.server<type=raft-channel-metrics><>(.+):"
          name: kafka_server_raftchannelmetrics_$1
          type: GAUGE
        # Broker metrics related to fetching metadata topic records in KRaft mode
        - pattern: "kafka.server<type=broker-metadata-metrics><>(.+):"
          name: kafka_server_brokermetadatametrics_$1
          type: GAUGE
    ---
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: cruise-control-metrics
      labels:
        app: strimzi
    data:
      metrics-config.yaml: |
        # See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics
        lowercaseOutputName: true
        rules:
        - pattern: kafka.cruisecontrol<name=(.+)><>(\w+)
          name: kafka_cruisecontrol_$1_$2
          type: GAUGE
    EOF
    
  2. 使用 kubectl apply 命令部署 Kafka 群集资源,将之前创建的节点池连接起来,形成一个完整的 Kafka 生态系统。 此自定义资源配置多个关键组件:

    • Kafka 核心配置:定义复制因素、侦听器设置和其他特定于 Kafka 的参数。
    • 巡航控制:提供自动化群集均衡和监视功能。
    • 实体操作器:部署主题操作器和用户操作器,这些操作器通过 Kubernetes 资源以声明式管理 Kafka 主题和用户。
    • JMX 指标:使用之前定义的 ConfigMap 配置指标暴露。
    kubectl apply -n kafka -f - <<EOF
    ---
    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: kafka-aks-cluster
      annotations:
        strimzi.io/node-pools: enabled
        strimzi.io/kraft: enabled
    spec:
      kafka:
        version: 3.9.0
        metadataVersion: 3.9-IV0
        rack:
          topologyKey: topology.kubernetes.io/zone
        template:
          podDisruptionBudget:
            maxUnavailable: 2
        listeners:
          - name: internal
            port: 9092
            type: internal
            tls: true
        config:
          offsets.topic.replication.factor: 3
          transaction.state.log.replication.factor: 3
          transaction.state.log.min.isr: 2
          default.replication.factor: 3
          min.insync.replicas: 2
          log.segment.bytes: 1073741824  
          log.retention.hours: 168  
          log.retention.check.interval.ms: 300000 
        metricsConfig:
          type: jmxPrometheusExporter
          valueFrom:
            configMapKeyRef:
              name: kafka-metrics
              key: kafka-metrics-config.yaml
      cruiseControl:
        metricsConfig:
          type: jmxPrometheusExporter
          valueFrom:
            configMapKeyRef:
              name: cruise-control-metrics
              key: metrics-config.yaml
      entityOperator:
        topicOperator: {}
        userOperator: {}
    EOF
    
  3. 部署后,通过检查是否已使用 kubectl get 命令创建并处于运行状态的所有 KafkaNodePools、Kafka 群集资源及其相应的 Pod 来验证 Kafka 部署。

    kubectl get pods,kafkanodepool,kafka -n kafka
    

    输出应类似于以下示例输出:

    NAME                                                     READY   STATUS    RESTARTS   AGE
    pod/kafka-aks-cluster-broker-0                           1/1     Running   0          1d22h
    pod/kafka-aks-cluster-broker-1                           1/1     Running   0          1d22h
    pod/kafka-aks-cluster-broker-2                           1/1     Running   0          1d22h
    pod/kafka-aks-cluster-controller-3                       1/1     Running   0          1d22h
    pod/kafka-aks-cluster-controller-4                       1/1     Running   0          1d22h
    pod/kafka-aks-cluster-controller-5                       1/1     Running   0          1d22h
    pod/kafka-aks-cluster-cruise-control-844b69848-87rf6     1/1     Running   0          1d22h
    pod/kafka-aks-cluster-entity-operator-6f949f6774-t8wql   2/2     Running   0          1d22h
    
    NAME                                        DESIRED REPLICAS   ROLES            NODEIDS
    kafkanodepool.kafka.strimzi.io/broker       3                  ["broker"]       [0,1,2]
    kafkanodepool.kafka.strimzi.io/controller   3                  ["controller"]   [3,4,5]
    
    NAME                                       DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   METADATA STATE   WARNINGS
    kafka.kafka.strimzi.io/kafka-aks-cluster 
    

创建 Kafka 用户和主题

使用 Kafka 群集自定义资源部署的 Strimzi 实体作员将 Kubernetes 自定义资源(KafkaTopicKafkaUser)转换为实际的 Kafka 资源,从而启用 GitOps 工作流和一致的配置管理。

注释

使用实体作员以声明方式创建 Kafka 主题和用户是可选的。 还可以使用传统的 Kafka CLI 工具或 API 创建它们。 但是,声明性方法提供版本控制、审核线索和跨环境一致的管理等优势。

  1. 使用 kubectl apply 命令创建具有主题作员的 Kafka 主题。

    kubectl apply -n kafka -f - << EOF  
    apiVersion: kafka.strimzi.io/v1beta2  
    kind: KafkaTopic  
    metadata:  
      name: test-topic  
      labels:  
        strimzi.io/cluster: kafka-aks-cluster 
    spec:  
      replicas: 3  
      partitions: 4  
      config:  
        retention.ms: 7200000  
        segment.bytes: 1073741824  
    EOF  
    
  2. 使用 kubectl get 命令验证 Kafka 主题是否已成功创建。

    kubectl get kafkatopic -n kafka  
    

    输出应类似于以下示例输出:

    NAME         CLUSTER             PARTITIONS   REPLICATION FACTOR   READY  
    test-topic   kafka-aks-cluster   4            3                    True  
    

有关详细信息,请参阅 使用主题作员管理 Kafka 主题

  1. 使用 kubectl apply 命令创建具有用户作员的 Kafka 用户。

    kubectl apply -f - <<EOF  
    apiVersion: kafka.strimzi.io/v1beta2  
    kind: KafkaUser  
    metadata:  
      name: test-user  
      labels:  
        strimzi.io/cluster: kafka-aks-cluster  
    spec:  
      authentication:  
        type: tls  
      authorization:  
        type: simple  
        acls:  
          - resource:  
              type: topic  
              name: test-topic  
              patternType: literal  
            operations:  
              - Describe  
              - Read  
            host: "*"  
          - resource:  
              type: group  
              name: test-group  
              patternType: literal  
            operations:  
              - Read  
            host: "*"  
          - resource:  
              type: topic  
              name: test-topic  
              patternType: literal  
            operations:  
              - Create  
              - Describe  
              - Write  
            host: "*"  
    EOF  
    

有关详细信息,请参阅 使用用户作员管理 Kafka 用户

后续步骤

供稿人

Microsoft维护本文。 以下贡献者最初撰写了这篇文章:

  • 塞尔吉奥·纳瓦尔 |高级客户工程师
  • Erin Schaffer | 内容开发人员 2