使用 Terraform 创建群集、笔记本和作业

本文介绍如何使用 Databricks Terraform 提供程序 在现有 Azure Databricks 工作区中创建群集笔记本作业

本文是以下 Azure Databricks 入门文章的补充文章:

还可以调整本文中的 Terraform 配置,在工作区中创建自定义群集、笔记本和作业。

步骤 1:创建并配置 Terraform 项目

  1. 按照 Databricks Terraform 提供程序概述文章的要求部分中的说明创建 Terraform 项目。

  2. 若要创建群集,请创建一个名为 cluster.tf 的文件,并将以下内容添加到该文件。 此内容创建一个群集,其中包含至少需包含的资源量。 此群集使用最新的 Databricks Runtime 长期支持 (LTS) 版本。

    对于要使用 Unity Catalog 的群集:

    variable "cluster_name" {}
    variable "cluster_autotermination_minutes" {}
    variable "cluster_num_workers" {}
    variable "cluster_data_security_mode" {}
    
    # Create the cluster with the "smallest" amount
    # of resources allowed.
    data "databricks_node_type" "smallest" {
      local_disk = true
    }
    
    # Use the latest Databricks Runtime
    # Long Term Support (LTS) version.
    data "databricks_spark_version" "latest_lts" {
      long_term_support = true
    }
    
    resource "databricks_cluster" "this" {
      cluster_name            = var.cluster_name
      node_type_id            = data.databricks_node_type.smallest.id
      spark_version           = data.databricks_spark_version.latest_lts.id
      autotermination_minutes = var.cluster_autotermination_minutes
      num_workers             = var.cluster_num_workers
      data_security_mode      = var.cluster_data_security_mode
    }
    
    output "cluster_url" {
     value = databricks_cluster.this.url
    }
    

    对于通用群集:

    variable "cluster_name" {
      description = "A name for the cluster."
      type        = string
      default     = "My Cluster"
    }
    
    variable "cluster_autotermination_minutes" {
      description = "How many minutes before automatically terminating due to inactivity."
      type        = number
      default     = 60
    }
    
    variable "cluster_num_workers" {
      description = "The number of workers."
      type        = number
      default     = 1
    }
    
    # Create the cluster with the "smallest" amount
    # of resources allowed.
    data "databricks_node_type" "smallest" {
      local_disk = true
    }
    
    # Use the latest Databricks Runtime
    # Long Term Support (LTS) version.
    data "databricks_spark_version" "latest_lts" {
      long_term_support = true
    }
    
    resource "databricks_cluster" "this" {
      cluster_name            = var.cluster_name
      node_type_id            = data.databricks_node_type.smallest.id
      spark_version           = data.databricks_spark_version.latest_lts.id
      autotermination_minutes = var.cluster_autotermination_minutes
      num_workers             = var.cluster_num_workers
    }
    
    output "cluster_url" {
     value = databricks_cluster.this.url
    }
    
  3. 若要创建群集,请创建另一个名为 cluster.auto.tfvars 的文件,并将以下内容添加到该文件。 此文件包含用于自定义群集的变量值。 将占位符值替换为你自己的值。

    对于要使用 Unity Catalog 的群集:

    cluster_name                    = "My Cluster"
    cluster_autotermination_minutes = 60
    cluster_num_workers             = 1
    cluster_data_security_mode      = "SINGLE_USER"
    

    对于通用群集:

    cluster_name                    = "My Cluster"
    cluster_autotermination_minutes = 60
    cluster_num_workers             = 1
    
  4. 若要创建笔记本,请创建另一个名为 notebook.tf 的文件,并将以下内容添加到该文件:

    variable "notebook_subdirectory" {
      description = "A name for the subdirectory to store the notebook."
      type        = string
      default     = "Terraform"
    }
    
    variable "notebook_filename" {
      description = "The notebook's filename."
      type        = string
    }
    
    variable "notebook_language" {
      description = "The language of the notebook."
      type        = string
    }
    
    resource "databricks_notebook" "this" {
      path     = "${data.databricks_current_user.me.home}/${var.notebook_subdirectory}/${var.notebook_filename}"
      language = var.notebook_language
      source   = "./${var.notebook_filename}"
    }
    
    output "notebook_url" {
     value = databricks_notebook.this.url
    }
    
  5. 如果要创建群集,请将以下笔记本代码保存到文件 notebook.tf 所在的同一目录的某文件中:

    对于教程:运行端到端湖屋分析管道的 Python 笔记本,是包含以下内容、名为 notebook-getting-started-lakehouse-e2e.py 的文件:

    # Databricks notebook source
    external_location = "<your_external_location>"
    catalog = "<your_catalog>"
    
    dbutils.fs.put(f"{external_location}/foobar.txt", "Hello world!", True)
    display(dbutils.fs.head(f"{external_location}/foobar.txt"))
    dbutils.fs.rm(f"{external_location}/foobar.txt")
    
    display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
    
    # COMMAND ----------
    
    from pyspark.sql.functions import col
    
    # Set parameters for isolation in workspace and reset demo
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"{catalog}.e2e_lakehouse_{username}_db"
    source = f"{external_location}/e2e-lakehouse-source"
    table = f"{database}.target_table"
    checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    # Clear out data from previous demo execution
    dbutils.fs.rm(source, True)
    dbutils.fs.rm(checkpoint_path, True)
    
    # Define a class to load batches of data to source
    class LoadData:
    
      def __init__(self, source):
        self.source = source
    
      def get_date(self):
        try:
          df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
          raise Exception("Source data exhausted")
          return batch_date
    
      def get_batch(self, batch_date):
        return (
          spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )
    
      def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)
    
      def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)
    
    RawData = LoadData(source)
    
    # COMMAND ----------
    
    RawData.land_batch()
    
    # COMMAND ----------
    
    # Import functions
    from pyspark.sql.functions import col, current_timestamp
    
    # Configure Auto Loader to ingest JSON data to a Delta table
    (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("cloudFiles.schemaLocation", checkpoint_path)
      .load(file_path)
      .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .option("mergeSchema", "true")
      .toTable(table))
    
    # COMMAND ----------
    
    df = spark.read.table(table_name)
    
    # COMMAND ----------
    
    display(df)
    

    对于快速入门:使用 Azure 门户在 Azure Databricks 工作区上运行 Spark 作业的 Python 笔记本,是包含以下内容、名为 notebook-quickstart-create-databricks-workspace-portal.py 的文件:

    # Databricks notebook source
    blob_account_name = "azureopendatastorage"
    blob_container_name = "citydatacontainer"
    blob_relative_path = "Safety/Release/city=Seattle"
    blob_sas_token = r""
    
    # COMMAND ----------
    
    wasbs_path = 'wasbs://%s@%s.blob.core.chinacloudapi.cn/%s' % (blob_container_name, blob_account_name,blob_relative_path)
    spark.conf.set('fs.azure.sas.%s.%s.blob.core.chinacloudapi.cn' % (blob_container_name, blob_account_name), blob_sas_token)
    print('Remote blob path: ' + wasbs_path)
    
    # COMMAND ----------
    
    df = spark.read.parquet(wasbs_path)
    print('Register the DataFrame as a SQL temporary view: source')
    df.createOrReplaceTempView('source')
    
    # COMMAND ----------
    
    print('Displaying top 10 rows: ')
    display(spark.sql('SELECT * FROM source LIMIT 10'))
    
  6. 如果要创建笔记本,请创建另一个名为 notebook.auto.tfvars 的文件,并将以下内容添加到该文件。 此文件包含用于自定义笔记本配置的变量值。

    对于教程:运行端到端湖屋分析管道的 Python 笔记本:

    notebook_subdirectory = "Terraform"
    notebook_filename     = "notebook-getting-started-lakehouse-e2e.py"
    notebook_language     = "PYTHON"
    

    对于快速入门:使用 Azure 门户在 Azure Databricks 工作区上运行 Spark 作业的 Python 笔记本:

    notebook_subdirectory = "Terraform"
    notebook_filename     = "notebook-quickstart-create-databricks-workspace-portal.py"
    notebook_language     = "PYTHON"
    
  7. 如果要在 Azure Databricks 工作区中创建笔记本,请务必参考以下说明、设置笔记本成功运行的任何要求:

  8. 若要创建作业,请创建另一个名为 job.tf 的文件,并将以下内容添加到该文件。 此内容创建用于运行笔记本的作业。

    variable "job_name" {
      description = "A name for the job."
      type        = string
      default     = "My Job"
    }
    
    variable "task_key" {
      description = "A name for the task."
      type        = string
      default     = "my_task"
    }
    
    resource "databricks_job" "this" {
      name = var.job_name
      task {
        task_key = var.task_key
        existing_cluster_id = databricks_cluster.this.cluster_id
        notebook_task {
          notebook_path = databricks_notebook.this.path
        }
      }
      email_notifications {
        on_success = [ data.databricks_current_user.me.user_name ]
        on_failure = [ data.databricks_current_user.me.user_name ]
      }
    }
    
    output "job_url" {
      value = databricks_job.this.url
    }
    
  9. 如果要创建作业,请创建另一个名为 job.auto.tfvars 的文件,并将以下内容添加到该文件。 此文件包含用于自定义作业配置的变量值。

    job_name = "My Job"
    task_key = "my_task"
    

步骤 2:运行配置

在此步骤中,将运行 Terraform 配置,将群集、笔记本和作业部署到 Azure Databricks 工作区。

  1. 通过运行 terraform validate 命令检查 Terraform 配置是否有效。 如果报告了任何错误,请解决错误,然后再次运行命令。

    terraform validate
    
  2. 通过运行 terraform plan 命令,检查 Terraform 在工作区中会执行哪些操作(在 Terraform 实际执行这些操作之前)。

    terraform plan
    
  3. 通过运行 terraform apply 命令,将群集、笔记本和作业部署到工作区中。 提示部署时,键入 yes,然后按 Enter。

    terraform apply
    

    Terraform 会部署项目中指定的资源。 部署这些资源(特别是群集)可能需要几分钟时间。

步骤 3:浏览结果

  1. 如果创建了群集,请在 terraform apply 命令的输出中复制 cluster_url 旁边的链接,并将其粘贴到 Web 浏览器的地址栏中。

  2. 如果创建了笔记本,请在 terraform apply 命令的输出中复制 notebook_url 旁边的链接,并将其粘贴到 Web 浏览器的地址栏中。

    注意

    在使用笔记本之前,可能需要自定义其内容。 请参阅有关如何自定义笔记本的相关文档。

  3. 如果创建了作业,请在 terraform apply 命令的输出中复制 job_url 旁边的链接,并将其粘贴到 Web 浏览器的地址栏中。

    注意

    运行笔记本之前,可能需要自定义其内容。 可访问本文开头的链接,阅读有关如何自定义笔记本的相关文档。

  4. 如果创建了作业,请按如下所示运行作业:

    1. 单击作业页面上的“立即运行”。
    2. 作业运行结束后,要查看作业运行的结果,请在作业页上的“已完成的运行(过去 60 天)”列表中,单击“开始时间”列中最近的时间条目。 “输出”窗格显示运行笔记本代码的结果。

步骤 4:清理

在此步骤中,将从工作区中删除前面的资源。

  1. 通过运行 terraform plan 命令,检查 Terraform 在工作区中会执行哪些操作(在 Terraform 实际执行这些操作之前)。

    terraform plan
    
  2. 通过运行 terraform destroy 命令,将群集、笔记本和作业从工作区中删除。 提示删除时,键入 yes,然后按 Enter。

    terraform destroy
    

    Terraform 会删除项目中指定的资源。