笔记本工作流Notebook workflows

使用 %run 命令,可在笔记本中包含另一个笔记本。The %run command allows you to include another notebook within a notebook. 通过此命令,你可连接表示关键 ETL 步骤、Spark 分析步骤或即席探索的各种笔记本。This command lets you concatenate various notebooks that represent key ETL steps, Spark analysis steps, or ad-hoc exploration. 但是,它不能生成更复杂的数据管道。However, it lacks the ability to build more complex data pipelines.

笔记本工作流是 %run 的补充,因为它们允许从笔记本返回值。Notebook workflows are a complement to %run because they let you return values from a notebook. 这使你可以轻松地生成包含依赖项的复杂工作流和管道。This allows you to easily build complex workflows and pipelines with dependencies. 可以正确地对运行进行参数化(例如,获取目录中的文件列表并将名称传递到另一个笔记本,这些操作无法通过 %run 完成),还可根据返回值创建 if/then/else 工作流。You can properly parameterize runs (for example, get a list of files in a directory and pass the names to another notebook—something that’s not possible with %run) and also create if/then/else workflows based on return values. 借助笔记本工作流,可通过相对路径调用其他笔记本。Notebook workflows allow you to call other notebooks via relative paths.

可使用 dbutils.notebook 方法实现笔记本工作流。You implement notebook workflows with dbutils.notebook methods. 这些方法(如所有 dbutils API)仅适用于 Scala 和 Python。These methods, like all of the dbutils APIs, are available only in Scala and Python. 但可使用 dbutils.notebook.run 调用 R 笔记本。However, you can use dbutils.notebook.run to invoke an R notebook.

备注

不支持长时间运行(完成时间超过 48 小时)的笔记本工作流作业Long-running notebook workflow jobs that take more than 48 hours to complete are not supported.

APIAPI

dbutils.notebook API 中可用于生成笔记本工作流的方法包括:runexitThe methods available in the dbutils.notebook API to build notebook workflows are: run and exit. 参数和返回值都必须是字符串。Both parameters and return values must be strings.

run(path: String, timeout_seconds: int, arguments: Map): String

运行笔记本并返回其退出值。Run a notebook and return its exit value. 该方法会启动一个立即运行的临时作业。The method starts an ephemeral job that runs immediately.

timeout_seconds 参数控制运行的超时值(0 表示无超时):如果对 run 的调用在指定时间内未完成,则会引发异常。The timeout_seconds parameter controls the timeout of the run (0 means no timeout): the call to run throws an exception if it doesn’t finish within the specified time. 如果 Azure Databricks 停机时间超过 10 分钟,笔记本运行将失败,而不考虑 timeout_secondsIf Azure Databricks is down for more than 10 minutes, the notebook run fails regardless of timeout_seconds.

arguments 参数可设置目标笔记本的小组件值。The arguments parameter sets widget values of the target notebook. 具体而言,如果正在运行的笔记本具有名为 A 的小组件,而且你将键值对 ("A": "B") 作为 arguments 参数的一部分传递给 run() 调用,则检索小组件 A 的值将返回 "B"Specifically, if the notebook you are running has a widget named A, and you pass a key-value pair ("A": "B") as part of the arguments parameter to the run() call, then retrieving the value of widget A will return "B". 可在小组件一文中找到有关创建和使用小组件的说明。You can find the instructions for creating and working with widgets in the Widgets article.

警告

arguments 参数只接受拉丁字符(ASCII 字符集)。The arguments parameter accepts only Latin characters (ASCII character set). 使用非 ASCII 字符将会返回错误。Using non-ASCII characters will return an error. 例如,中文、日文汉字和表情符号都属于无效的非 ASCII 字符。Examples of invalid, non-ASCII characters are Chinese, Japanese kanjis, and emojis.

run 用法run Usage

PythonPython

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

ScalaScala

dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

run 示例run Example

假设你有一个名为 workflows 的笔记本,其中包含一个名为 foo,该笔记本将小组件值打印为:Suppose you have a notebook named workflows with a widget named foo that prints the widget’s value:

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print dbutils.widgets.get("foo")

运行 dbutils.notebook.run("workflows", 60, {"foo": "bar"}) 将产生以下结果:Running dbutils.notebook.run("workflows", 60, {"foo": "bar"}) produces the following result:

包含小组件的笔记本工作流Notebook workflow with widget

小组件具有通过工作流传入的值 "bar",而不是默认值。The widget had the value you passed in through the workflow, "bar", rather than the default.

exit(value: String): void 使用值退出笔记本。exit(value: String): void Exit a notebook with a value. 如果使用 run 方法调用笔记本,则会返回以下值。If you call a notebook using the run method, this is the value returned.

dbutils.notebook.exit("returnValue")

在作业中调用 dbutils.notebook.exit 可导致笔记本成功完成。Calling dbutils.notebook.exit in a job causes the notebook to complete successfully. 如果希望作业失败,请引发异常。If you want to cause the job to fail, throw an exception.

示例 Example

以下示例将 arguments 传递到 DataImportNotebook 并根据 DataImportNotebook 的结果运行不同的笔记本(DataCleaningNotebookErrorHandlingNotebook)。In the following example, you pass arguments to DataImportNotebook and run different notebooks (DataCleaningNotebook or ErrorHandlingNotebook) based on the result from DataImportNotebook.

笔记本工作流Notebook workflow

当笔记本工作流运行时,会显示指向正在运行的笔记本的链接:When the notebook workflow runs, you see a link to the running notebook:

笔记本工作流运行Notebook workflow run

单击“笔记本作业 #xxxx”笔记本链接,查看该运行的详细信息:Click the notebook link Notebook job #xxxx to view the details of the run:

笔记本工作流运行结果Notebook workflow run result

传递结构化数据Pass structured data

本部分说明如何在笔记本之间传递结构化数据。This section illustrates how to pass structured data between notebooks.

PythonPython

# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
sqlContext.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
sqlContext.range(5).toDF("value").write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print json.loads(result)

ScalaScala

// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

处理错误Handle errors

本部分说明如何处理笔记本工作流中的错误。This section illustrates how to handle errors in notebook workflows.

PythonPython

# Errors in workflows thrown a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print "Retrying error", e
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

ScalaScala

// Errors in workflows thrown a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

同时运行多个笔记本Run multiple notebooks concurrently

可使用标准 Scala 和 Python 构造同时运行多个笔记本,如 Thread(ScalaPython)和 Future(ScalaPython)。You can run multiple notebooks at the same time by using standard Scala and Python constructs such as Threads (Scala, Python) and Futures (Scala, Python). 高级笔记本工作流笔记本演示如何使用这些构造。The advanced notebook workflow notebooks demonstrate how to use these constructs. 笔记本为 Scala 语言,但你可轻松使用 Python 编写等效内容。The notebooks are in Scala but you could easily write the equivalent in Python. 运行示例:To run the example:

  1. 下载笔记本存档Download the notebook archive.
  2. 将存档导入到工作区。Import the archive into a workspace.
  3. 运行“Concurrent Notebooks”笔记本。Run the Concurrent Notebooks notebook.