使用 Terraform 创建群集、笔记本和作业
本文介绍如何使用 Databricks Terraform 提供程序 在现有 Azure Databricks 工作区中创建群集、笔记本和作业。
本文是以下 Azure Databricks 入门文章的补充文章:
教程:运行端到端湖屋分析管道,其中使用了可处理 Unity Catalog 的群集、Python 笔记本和运行笔记本的作业。
快速入门:使用 Azure 门户在 Azure Databricks 工作区上运行 Spark 作业,其中使用了通用群集和 Python 笔记本。
还可以调整本文中的 Terraform 配置,在工作区中创建自定义群集、笔记本和作业。
步骤 1:创建并配置 Terraform 项目
按照 Databricks Terraform 提供程序概述文章的要求部分中的说明创建 Terraform 项目。
若要创建群集,请创建一个名为
cluster.tf
的文件,并将以下内容添加到该文件。 此内容创建一个群集,其中包含至少需包含的资源量。 此群集使用最新的 Databricks Runtime 长期支持 (LTS) 版本。对于要使用 Unity Catalog 的群集:
variable "cluster_name" {} variable "cluster_autotermination_minutes" {} variable "cluster_num_workers" {} variable "cluster_data_security_mode" {} # Create the cluster with the "smallest" amount # of resources allowed. data "databricks_node_type" "smallest" { local_disk = true } # Use the latest Databricks Runtime # Long Term Support (LTS) version. data "databricks_spark_version" "latest_lts" { long_term_support = true } resource "databricks_cluster" "this" { cluster_name = var.cluster_name node_type_id = data.databricks_node_type.smallest.id spark_version = data.databricks_spark_version.latest_lts.id autotermination_minutes = var.cluster_autotermination_minutes num_workers = var.cluster_num_workers data_security_mode = var.cluster_data_security_mode } output "cluster_url" { value = databricks_cluster.this.url }
对于通用群集:
variable "cluster_name" { description = "A name for the cluster." type = string default = "My Cluster" } variable "cluster_autotermination_minutes" { description = "How many minutes before automatically terminating due to inactivity." type = number default = 60 } variable "cluster_num_workers" { description = "The number of workers." type = number default = 1 } # Create the cluster with the "smallest" amount # of resources allowed. data "databricks_node_type" "smallest" { local_disk = true } # Use the latest Databricks Runtime # Long Term Support (LTS) version. data "databricks_spark_version" "latest_lts" { long_term_support = true } resource "databricks_cluster" "this" { cluster_name = var.cluster_name node_type_id = data.databricks_node_type.smallest.id spark_version = data.databricks_spark_version.latest_lts.id autotermination_minutes = var.cluster_autotermination_minutes num_workers = var.cluster_num_workers } output "cluster_url" { value = databricks_cluster.this.url }
若要创建群集,请创建另一个名为
cluster.auto.tfvars
的文件,并将以下内容添加到该文件。 此文件包含用于自定义群集的变量值。 将占位符值替换为你自己的值。对于要使用 Unity Catalog 的群集:
cluster_name = "My Cluster" cluster_autotermination_minutes = 60 cluster_num_workers = 1 cluster_data_security_mode = "SINGLE_USER"
对于通用群集:
cluster_name = "My Cluster" cluster_autotermination_minutes = 60 cluster_num_workers = 1
若要创建笔记本,请创建另一个名为
notebook.tf
的文件,并将以下内容添加到该文件:variable "notebook_subdirectory" { description = "A name for the subdirectory to store the notebook." type = string default = "Terraform" } variable "notebook_filename" { description = "The notebook's filename." type = string } variable "notebook_language" { description = "The language of the notebook." type = string } resource "databricks_notebook" "this" { path = "${data.databricks_current_user.me.home}/${var.notebook_subdirectory}/${var.notebook_filename}" language = var.notebook_language source = "./${var.notebook_filename}" } output "notebook_url" { value = databricks_notebook.this.url }
如果要创建群集,请将以下笔记本代码保存到文件
notebook.tf
所在的同一目录的某文件中:对于教程:运行端到端湖屋分析管道的 Python 笔记本,是包含以下内容、名为
notebook-getting-started-lakehouse-e2e.py
的文件:# Databricks notebook source external_location = "<your_external_location>" catalog = "<your_catalog>" dbutils.fs.put(f"{external_location}/foobar.txt", "Hello world!", True) display(dbutils.fs.head(f"{external_location}/foobar.txt")) dbutils.fs.rm(f"{external_location}/foobar.txt") display(spark.sql(f"SHOW SCHEMAS IN {catalog}")) # COMMAND ---------- from pyspark.sql.functions import col # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"{catalog}.e2e_lakehouse_{username}_db" source = f"{external_location}/e2e-lakehouse-source" table = f"{database}.target_table" checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") # Clear out data from previous demo execution dbutils.fs.rm(source, True) dbutils.fs.rm(checkpoint_path, True) # Define a class to load batches of data to source class LoadData: def __init__(self, source): self.source = source def get_date(self): try: df = spark.read.format("json").load(source) except: return "2016-01-01" batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0] if batch_date.month == 3: raise Exception("Source data exhausted") return batch_date def get_batch(self, batch_date): return ( spark.table("samples.nyctaxi.trips") .filter(col("tpep_pickup_datetime").cast("date") == batch_date) ) def write_batch(self, batch): batch.write.format("json").mode("append").save(self.source) def land_batch(self): batch_date = self.get_date() batch = self.get_batch(batch_date) self.write_batch(batch) RawData = LoadData(source) # COMMAND ---------- RawData.land_batch() # COMMAND ---------- # Import functions from pyspark.sql.functions import col, current_timestamp # Configure Auto Loader to ingest JSON data to a Delta table (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", checkpoint_path) .load(file_path) .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time")) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .option("mergeSchema", "true") .toTable(table)) # COMMAND ---------- df = spark.read.table(table_name) # COMMAND ---------- display(df)
对于快速入门:使用 Azure 门户在 Azure Databricks 工作区上运行 Spark 作业的 Python 笔记本,是包含以下内容、名为
notebook-quickstart-create-databricks-workspace-portal.py
的文件:# Databricks notebook source blob_account_name = "azureopendatastorage" blob_container_name = "citydatacontainer" blob_relative_path = "Safety/Release/city=Seattle" blob_sas_token = r"" # COMMAND ---------- wasbs_path = 'wasbs://%s@%s.blob.core.chinacloudapi.cn/%s' % (blob_container_name, blob_account_name,blob_relative_path) spark.conf.set('fs.azure.sas.%s.%s.blob.core.chinacloudapi.cn' % (blob_container_name, blob_account_name), blob_sas_token) print('Remote blob path: ' + wasbs_path) # COMMAND ---------- df = spark.read.parquet(wasbs_path) print('Register the DataFrame as a SQL temporary view: source') df.createOrReplaceTempView('source') # COMMAND ---------- print('Displaying top 10 rows: ') display(spark.sql('SELECT * FROM source LIMIT 10'))
如果要创建笔记本,请创建另一个名为
notebook.auto.tfvars
的文件,并将以下内容添加到该文件。 此文件包含用于自定义笔记本配置的变量值。对于教程:运行端到端湖屋分析管道的 Python 笔记本:
notebook_subdirectory = "Terraform" notebook_filename = "notebook-getting-started-lakehouse-e2e.py" notebook_language = "PYTHON"
对于快速入门:使用 Azure 门户在 Azure Databricks 工作区上运行 Spark 作业的 Python 笔记本:
notebook_subdirectory = "Terraform" notebook_filename = "notebook-quickstart-create-databricks-workspace-portal.py" notebook_language = "PYTHON"
如果要在 Azure Databricks 工作区中创建笔记本,请务必参考以下说明、设置笔记本成功运行的任何要求:
- 教程:运行端到端湖屋分析管道的 Python 笔记本
- 用于快速入门:使用 Azure 门户在 Azure Databricks 工作区上运行 Spark 作业的 Python 笔记本
若要创建作业,请创建另一个名为
job.tf
的文件,并将以下内容添加到该文件。 此内容创建用于运行笔记本的作业。variable "job_name" { description = "A name for the job." type = string default = "My Job" } variable "task_key" { description = "A name for the task." type = string default = "my_task" } resource "databricks_job" "this" { name = var.job_name task { task_key = var.task_key existing_cluster_id = databricks_cluster.this.cluster_id notebook_task { notebook_path = databricks_notebook.this.path } } email_notifications { on_success = [ data.databricks_current_user.me.user_name ] on_failure = [ data.databricks_current_user.me.user_name ] } } output "job_url" { value = databricks_job.this.url }
如果要创建作业,请创建另一个名为
job.auto.tfvars
的文件,并将以下内容添加到该文件。 此文件包含用于自定义作业配置的变量值。job_name = "My Job" task_key = "my_task"
步骤 2:运行配置
在此步骤中,将运行 Terraform 配置,将群集、笔记本和作业部署到 Azure Databricks 工作区。
通过运行
terraform validate
命令检查 Terraform 配置是否有效。 如果报告了任何错误,请解决错误,然后再次运行命令。terraform validate
通过运行
terraform plan
命令,检查 Terraform 在工作区中会执行哪些操作(在 Terraform 实际执行这些操作之前)。terraform plan
通过运行
terraform apply
命令,将群集、笔记本和作业部署到工作区中。 提示部署时,键入yes
,然后按 Enter。terraform apply
Terraform 会部署项目中指定的资源。 部署这些资源(特别是群集)可能需要几分钟时间。
步骤 3:浏览结果
如果创建了群集,请在
terraform apply
命令的输出中复制cluster_url
旁边的链接,并将其粘贴到 Web 浏览器的地址栏中。如果创建了笔记本,请在
terraform apply
命令的输出中复制notebook_url
旁边的链接,并将其粘贴到 Web 浏览器的地址栏中。注意
在使用笔记本之前,可能需要自定义其内容。 请参阅有关如何自定义笔记本的相关文档。
如果创建了作业,请在
terraform apply
命令的输出中复制job_url
旁边的链接,并将其粘贴到 Web 浏览器的地址栏中。注意
运行笔记本之前,可能需要自定义其内容。 可访问本文开头的链接,阅读有关如何自定义笔记本的相关文档。
如果创建了作业,请按如下所示运行作业:
- 单击作业页面上的“立即运行”。
- 作业运行结束后,要查看作业运行的结果,请在作业页上的“已完成的运行(过去 60 天)”列表中,单击“开始时间”列中最近的时间条目。 “输出”窗格显示运行笔记本代码的结果。
步骤 4:清理
在此步骤中,将从工作区中删除前面的资源。
通过运行
terraform plan
命令,检查 Terraform 在工作区中会执行哪些操作(在 Terraform 实际执行这些操作之前)。terraform plan
通过运行
terraform destroy
命令,将群集、笔记本和作业从工作区中删除。 提示删除时,键入yes
,然后按 Enter。terraform destroy
Terraform 会删除项目中指定的资源。