在 Azure Kubernetes 服务(AKS)上配置和部署 Airflow

本文介绍如何使用 Helm 在 Azure Kubernetes 服务(AKS)上配置和部署 Apache Airflow。

配置工作负荷标识

  1. 使用 kubectl create namespace 命令为 Airflow 群集创建命名空间。

    kubectl create namespace ${AKS_AIRFLOW_NAMESPACE} --dry-run=client --output yaml | kubectl apply -f -
    

    示例输出:

    namespace/airflow created
    
  2. 使用 kubectl apply 命令创建服务帐户并配置工作负荷标识。

    export TENANT_ID=$(az account show --query tenantId -o tsv)
    cat <<EOF | kubectl apply -f -
    apiVersion: v1
    kind: ServiceAccount
    metadata:
      annotations:
        azure.workload.identity/client-id: "${MY_IDENTITY_NAME_CLIENT_ID}"
        azure.workload.identity/tenant-id: "${TENANT_ID}"
      name: "${SERVICE_ACCOUNT_NAME}"
      namespace: "${AKS_AIRFLOW_NAMESPACE}"
    EOF
    

    示例输出:

    serviceaccount/airflow created
    

安装外部机密操作员

在本部分中,我们使用 Helm 安装外部机密操作员。 外部机密操作员是 Kubernetes 操作员,用于管理存储在外部机密存储(如 Azure Key Vault)中的外部机密的生命周期。

  1. 添加外部机密 Helm 存储库,并使用 helm repo addhelm repo update 命令更新存储库。

    helm repo add external-secrets https://charts.external-secrets.io
    helm repo update
    

    示例输出:

    Hang tight while we grab the latest from your chart repositories...
    ...Successfully got an update from the "external-secrets" chart repository
    
  2. 使用 helm install 命令安装外部机密操作员。

    helm install external-secrets \
    external-secrets/external-secrets \
    --namespace ${AKS_AIRFLOW_NAMESPACE} \
    --create-namespace \
    --set installCRDs=true \
    --wait
    

    示例输出:

    NAME: external-secrets
    LAST DEPLOYED: Thu Nov  7 11:16:07 2024
    NAMESPACE: airflow
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    NOTES:
    external-secrets has been deployed successfully in namespace airflow!
    
    In order to begin using ExternalSecrets, you will need to set up a SecretStore
    or ClusterSecretStore resource (for example, by creating a 'vault' SecretStore).
    
    More information on the different types of SecretStores and how to configure them
    can be found in our Github: https://github.com/external-secrets/external-secrets
    

创建机密

  1. 创建一个 SecretStore 资源,以使用 kubectl apply 命令访问密钥保管库中存储的 Airflow 密码。

    kubectl apply -f - <<EOF
    apiVersion: external-secrets.io/v1beta1
    kind: SecretStore
    metadata:
      name: azure-store
      namespace: ${AKS_AIRFLOW_NAMESPACE}
    spec:
      provider:
        # provider type: azure keyvault
        azurekv:
          authType: WorkloadIdentity
          vaultUrl: "${KEYVAULTURL}"
          serviceAccountRef:
            name: ${SERVICE_ACCOUNT_NAME}
    EOF
    

    示例输出:

    secretstore.external-secrets.io/azure-store created
    
  2. 使用命令创建一个ExternalSecret资源,该资源使用密钥保管库kubectl apply中存储的机密在命名空间Airflow中创建 Kubernetes Secretairflow

    kubectl apply -f - <<EOF
    apiVersion: external-secrets.io/v1beta1
    kind: ExternalSecret
    metadata:
      name: airflow-aks-azure-logs-secrets
      namespace: ${AKS_AIRFLOW_NAMESPACE}
    spec:
      refreshInterval: 1h
      secretStoreRef:
        kind: SecretStore
        name: azure-store
    
      target:
        name: ${AKS_AIRFLOW_LOGS_STORAGE_SECRET_NAME}
        creationPolicy: Owner
    
      data:
        # name of the SECRET in the Azure KV (no prefix is by default a SECRET)
        - secretKey: azurestorageaccountname
          remoteRef:
            key: AKS-AIRFLOW-LOGS-STORAGE-ACCOUNT-NAME
        - secretKey: azurestorageaccountkey
          remoteRef:
            key: AKS-AIRFLOW-LOGS-STORAGE-ACCOUNT-KEY
    EOF
    

    示例输出:

    externalsecret.external-secrets.io/airflow-aks-azure-logs-secrets created
    
  3. 使用 az identity federated-credential create 命令创建联合凭据。

    az identity federated-credential create \
        --name external-secret-operator \
        --identity-name ${MY_IDENTITY_NAME} \
        --resource-group ${MY_RESOURCE_GROUP_NAME} \
        --issuer ${OIDC_URL} \
        --subject system:serviceaccount:${AKS_AIRFLOW_NAMESPACE}:${SERVICE_ACCOUNT_NAME} \
        --output table
    

    示例输出:

    Issuer                                                                                                                   Name                      ResourceGroup            Subject
    -----------------------------------------------------------------------------------------------------------------------  ------------------------  -----------------------  -------------------------------------
    https://$MY_LOCATION.oic.prod-aks.azure.com/c2c2c2c2-dddd-eeee-ffff-a3a3a3a3a3a3/aaaa0a0a-bb1b-cc2c-dd3d-eeeeee4e4e4e/  external-secret-operator  $MY_RESOURCE_GROUP_NAME  system:serviceaccount:airflow:airflow
    
  4. 使用 az keyvault set-policy 命令为用户分配的身份授予访问机密的权限。

    az keyvault set-policy --name $MY_KEYVAULT_NAME --object-id $MY_IDENTITY_NAME_PRINCIPAL_ID --secret-permissions get --output table
    

    示例输出:

    Location       Name                    ResourceGroup
    -------------  ----------------------  -----------------------
    $MY_LOCATION   $MY_KEYVAULT_NAME       $MY_RESOURCE_GROUP_NAME
    

为 Apache Airflow 日志创建永久性卷

  • 使用 kubectl apply 命令创建永久性卷。

    kubectl apply -f - <<EOF
    apiVersion: v1
    kind: PersistentVolume
    metadata:
      name: pv-airflow-logs
      labels:
        type: local
    spec:
      capacity:
        storage: 5Gi
      accessModes:
        - ReadWriteMany
      persistentVolumeReclaimPolicy: Retain # If set as "Delete" container would be removed after pvc deletion
      storageClassName: azureblob-fuse-premium
      mountOptions:
        - -o allow_other
        - --file-cache-timeout-in-seconds=120
      csi:
        driver: blob.csi.azure.com
        readOnly: false
        volumeHandle: airflow-logs-1
        volumeAttributes:
          resourceGroup: ${MY_RESOURCE_GROUP_NAME}
          storageAccount: ${AKS_AIRFLOW_LOGS_STORAGE_ACCOUNT_NAME}
          containerName: ${AKS_AIRFLOW_LOGS_STORAGE_CONTAINER_NAME}
        nodeStageSecretRef:
          name: ${AKS_AIRFLOW_LOGS_STORAGE_SECRET_NAME}
          namespace: ${AKS_AIRFLOW_NAMESPACE}
    EOF
    

    示例输出:

    persistentvolume/pv-airflow-logs created
    

为 Apache Airflow 日志创建永久性卷声明

  • 使用 kubectl apply 命令创建永久性卷声明。

    kubectl apply -f - <<EOF
    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: pvc-airflow-logs
      namespace: ${AKS_AIRFLOW_NAMESPACE}
    spec:
      storageClassName: azureblob-fuse-premium
      accessModes:
        - ReadWriteMany
      resources:
        requests:
          storage: 5Gi
      volumeName: pv-airflow-logs
    EOF
    

    示例输出:

    persistentvolumeclaim/pvc-airflow-logs created
    

使用 Helm 部署 Apache Airflow

  1. airflow_values.yaml将文件配置为更改图表的默认部署配置,并更新映像的容器注册表。

    cat <<EOF> airflow_values.yaml
    
    images:
      airflow:
        repository: $MY_ACR_REGISTRY.azurecr.cn/airflow
        tag: 2.9.3
        # Specifying digest takes precedence over tag.
        digest: ~
        pullPolicy: IfNotPresent
      # To avoid images with user code, you can turn this to 'true' and
      # all the 'run-airflow-migrations' and 'wait-for-airflow-migrations' containers/jobs
      # will use the images from 'defaultAirflowRepository:defaultAirflowTag' values
      # to run and wait for DB migrations .
      useDefaultImageForMigration: false
      # timeout (in seconds) for airflow-migrations to complete
      migrationsWaitTimeout: 60
      pod_template:
        # Note that `images.pod_template.repository` and `images.pod_template.tag` parameters
        # can be overridden in `config.kubernetes` section. So for these parameters to have effect
        # `config.kubernetes.worker_container_repository` and `config.kubernetes.worker_container_tag`
        # must be not set .
        repository: $MY_ACR_REGISTRY.azurecr.cn/airflow
        tag: 2.9.3
        pullPolicy: IfNotPresent
      flower:
        repository: $MY_ACR_REGISTRY.azurecr.cn/airflow
        tag: 2.9.3
        pullPolicy: IfNotPresent
      statsd:
        repository: $MY_ACR_REGISTRY.azurecr.cn/statsd-exporter
        tag: v0.26.1
        pullPolicy: IfNotPresent
      pgbouncer:
        repository: $MY_ACR_REGISTRY.azurecr.cn/airflow
        tag: airflow-pgbouncer-2024.01.19-1.21.0
        pullPolicy: IfNotPresent
      pgbouncerExporter:
        repository: $MY_ACR_REGISTRY.azurecr.cn/airflow
        tag: airflow-pgbouncer-exporter-2024.06.18-0.17.0
        pullPolicy: IfNotPresent
      gitSync:
        repository: $MY_ACR_REGISTRY.azurecr.cn/git-sync
        tag: v4.1.0
        pullPolicy: IfNotPresent
    
    
    # Airflow executor
    executor: "KubernetesExecutor"
    
    # Environment variables for all airflow containers
    env:
      - name: ENVIRONMENT
        value: dev
    
    extraEnv: |
      - name: AIRFLOW__CORE__DEFAULT_TIMEZONE
        value: ' China/Shang_hai'
    
    # Configuration for postgresql subchart
    # Not recommended for production! Instead, spin up your own Postgresql server and use the `data` attribute in this
    # yaml file.
    postgresql:
      enabled: true
    
    # Enable pgbouncer. See https://airflow.apache.org/docs/helm-chart/stable/production-guide.html#pgbouncer
    pgbouncer:
      enabled: true
    
    dags:
      gitSync:
        enabled: true
        repo: https://github.com/donhighmsft/airflowexamples.git
        branch: main
        rev: HEAD
        depth: 1
        maxFailures: 0
        subPath: "dags"
        # sshKeySecret: airflow-git-ssh-secret
        # knownHosts: |
        #   github.com ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCj7ndNxQowgcQnjshcLrqPEiiphnt+VTTvDP6mHBL9j1aNUkY4Ue1gvwnGLVlOhGeYrnZaMgRK6+PKCUXaDbC7qtbW8gIkhL7aGCsOr/C56SJMy/BCZfxd1nWzAOxSDPgVsmerOBYfNqltV9/hWCqBywINIR+5dIg6JTJ72pcEpEjcYgXkE2YEFXV1JHnsKgbLWNlhScqb2UmyRkQyytRLtL+38TGxkxCflmO+5Z8CSSNY7GidjMIZ7Q4zMjA2n1nGrlTDkzwDCsw+wqFPGQA179cnfGWOWRVruj16z6XyvxvjJwbz0wQZ75XK5tKSb7FNyeIEs4TT4jk+S4dhPeAUC5y+bDYirYgM4GC7uEnztnZyaVWQ7B381AK4Qdrwt51ZqExKbQpTUNn+EjqoTwvqNj4kqx5QUCI0ThS/YkOxJCXmPUWZbhjpCg56i+2aB6CmK2JGhn57K5mj0MNdBXA4/WnwH6XoPWJzK5Nyu2zB3nAZp+S5hpQs+p1vN1/wsjk=
    
    logs:
      persistence:
        enabled: true
        existingClaim: pvc-airflow-logs
        storageClassName: azureblob-fuse-premium
    
    # We disable the log groomer sidecar because we use Azure Blob Storage for logs, with lifecyle policy set.
    triggerer:
      logGroomerSidecar:
        enabled: false
    
    scheduler:
      logGroomerSidecar:
        enabled: false
    
    workers:
      logGroomerSidecar:
        enabled: false
    
    EOF
    
  2. 添加 Apache Airflow Helm 存储库,并使用 helm repo add 命令 helm repo update 更新存储库。

    helm repo add apache-airflow https://airflow.apache.org
    helm repo update
    

    示例输出:

    "apache-airflow" has been added to your repositories
    Hang tight while we grab the latest from your chart repositories...
    ...Successfully got an update from the "apache-airflow" chart repository
    
  3. 使用 helm search repo 命令在 Helm 存储库中搜索 Apache Airflow 图表。

    helm search repo airflow
    

    示例输出:

    NAME                    CHART VERSION   APP VERSION     DESCRIPTION
    apache-airflow/airflow  1.15.0          2.9.3           The official Helm chart to deploy Apache Airflo...
    
  4. 使用 helm install 命令安装 Apache Airflow 图表。

    helm install airflow apache-airflow/airflow --namespace airflow --create-namespace -f airflow_values.yaml --debug
    

    示例输出:

    NAME: airflow
    LAST DEPLOYED: Fri Nov  8 11:59:43 2024
    NAMESPACE: airflow
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    NOTES:
    Thank you for installing Apache Airflow 2.9.3!
    
    Your release is named airflow.
    You can now access your dashboard(s) by executing the following command(s) and visiting the corresponding port at localhost in your browser:
    
    Airflow Webserver:     kubectl port-forward svc/airflow-webserver 8080:8080 --namespace airflow
    Default Webserver (Airflow UI) Login credentials:
        username: admin
        password: admin
    Default Postgres connection credentials:
        username: postgres
        password: postgres
        port: 5432
    
    You can get Fernet Key value by running the following:
    
        echo Fernet Key: $(kubectl get secret --namespace airflow airflow-fernet-key -o jsonpath="{.data.fernet-key}" | base64 --decode)
    
    ###########################################################
    #  WARNING: You should set a static webserver secret key  #
    ###########################################################
    
    You are using a dynamically generated webserver secret key, which can lead to
    unnecessary restarts of your Airflow components.
    
    Information on how to set a static webserver secret key can be found here:
    https://airflow.apache.org/docs/helm-chart/stable/production-guide.html#webserver-secret-key
    
  5. 使用 kubectl get pods 命令验证安装。

    kubectl get pods -n airflow
    

    示例输出:

    NAME                                                READY   STATUS      RESTARTS   AGE
    airflow-create-user-kklqf                           1/1     Running     0          12s
    airflow-pgbouncer-d7bf9f649-25fnt                   2/2     Running     0          61s
    airflow-postgresql-0                                1/1     Running     0          61s
    airflow-run-airflow-migrations-zns2b                0/1     Completed   0          60s
    airflow-scheduler-5c45c6dbdd-7t6hv                  1/2     Running     0          61s
    airflow-statsd-6df8564664-6rbw8                     1/1     Running     0          61s
    airflow-triggerer-0                                 2/2     Running     0          61s
    airflow-webserver-7df76f944c-vcd5s                  0/1     Running     0          61s
    external-secrets-748f44c8b8-w7qrk                   1/1     Running     0          3h6m
    external-secrets-cert-controller-57b9f4cb7c-vl4m8   1/1     Running     0          3h6m
    external-secrets-webhook-5954b69786-69rlp           1/1     Running     0          3h6m
    

访问 Airflow UI

  1. 使用 kubectl port-forward 命令通过端口转发安全地访问 Airflow UI。

    kubectl port-forward svc/airflow-webserver 8080:8080 -n airflow
    
  2. 打开浏览器并导航到 localhost:8080 访问 Airflow UI。

  3. 使用 Airflow Helm 图表安装过程中提供的默认 Web 服务器 URL 和登录凭据登录。

  4. 通过 Airflow UI 安全地浏览和管理工作流。

将 Git 与 Airflow 集成

将 Git 与 Apache Airflow 集成 可实现工作流定义的无缝版本控制和简化的管理,确保组织且易于审核的所有 DAG。

  1. 为 DAG 设置 Git 存储库。 创建专用 Git 存储库以容纳所有 Airflow DAG 定义。 此存储库充当工作流的核心事实来源,使你能够有效地管理、跟踪和协作处理 DAG。
  2. 配置 Airflow 以从 Git 同步 DAG。 更新 Airflow 的配置,以便通过直接在 Airflow 的配置文件或 Helm 图表值中设置 Git 存储库 URL 和任何必需的身份验证凭据,自动从 Git 存储库拉取 DAG。 此设置可自动同步 DAG,确保 Airflow 始终与最新版本的工作流保持最新。

此集成通过引入完整版本控制、启用回滚和支持生产级设置中的团队协作来增强开发和部署工作流。

在 Kubernetes 生产级上生成 Airflow

以下最佳做法可帮助你在 Kubernetes 部署生产级上生成 Apache Airflow

  • 确保有一个可靠的设置专注于可伸缩性、安全性和可靠性。
  • 使用专用的自动缩放节点,并选择可复原的执行程序,例如 KubernetesExecutorCeleryExecutorCeleryKubernetesExecutor
  • 使用托管的高可用性数据库后端,例如 MySQL 或 PostgreSQL
  • 建立全面的监视和集中式日志记录,以维护性能见解。
  • 使用网络策略、SSL 和 Role-Based 访问控制(RBAC)保护环境,并配置 Airflow 组件(计划程序、Web 服务器、辅助角色)以实现高可用性。
  • 实现 CI/CD 管道以实现平滑 DAG 部署,并为灾难恢复设置常规备份。

后续步骤

若要详细了解如何在 Azure Kubernetes 服务(AKS)上部署开源软件,请参阅以下文章:

供稿人

Microsoft维护本文。 以下贡献者最初撰写了这篇文章:

  • Don High | 首席客户工程师
  • Satya Chandragiri |高级数字云解决方案架构师
  • Erin Schaffer | 内容开发人员 2