笔记本工作流Notebook workflows
使用 %run 命令,可在笔记本中包含另一个笔记本。The %run command allows you to include another notebook within a notebook. 通过此命令,你可连接表示关键 ETL 步骤、Spark 分析步骤或即席探索的各种笔记本。This command lets you concatenate various notebooks that represent key ETL steps, Spark analysis steps, or ad-hoc exploration. 但是,它不能生成更复杂的数据管道。However, it lacks the ability to build more complex data pipelines.
笔记本工作流是 %run
的补充,因为它们允许从笔记本返回值。Notebook workflows are a complement to %run
because they let you return values from a notebook. 这使你可以轻松地生成包含依赖项的复杂工作流和管道。This allows you to easily build complex workflows and pipelines with dependencies. 可以正确地对运行进行参数化(例如,获取目录中的文件列表并将名称传递到另一个笔记本,这些操作无法通过 %run
完成),还可根据返回值创建 if/then/else 工作流。You can properly parameterize runs (for example, get a list of files in a directory and pass the names to another notebook—something that’s not possible with %run
) and also create if/then/else workflows based on return values. 借助笔记本工作流,可通过相对路径调用其他笔记本。Notebook workflows allow you to call other notebooks via relative paths.
可使用 dbutils.notebook
方法实现笔记本工作流。You implement notebook workflows with dbutils.notebook
methods. 这些方法(如所有 dbutils
API)仅适用于 Scala 和 Python。These methods, like all of the dbutils
APIs, are available only in Scala and Python. 但可使用 dbutils.notebook.run
调用 R 笔记本。However, you can use dbutils.notebook.run
to invoke an R notebook.
备注
不支持长时间运行(完成时间超过 48 小时)的笔记本工作流作业。Long-running notebook workflow jobs that take more than 48 hours to complete are not supported.
APIAPI
dbutils.notebook
API 中可用于生成笔记本工作流的方法包括:run
和 exit
。The methods available in the dbutils.notebook
API to build notebook workflows are: run
and exit
. 参数和返回值都必须是字符串。Both parameters and return values must be strings.
run(path: String, timeout_seconds: int, arguments: Map): String
运行笔记本并返回其退出值。Run a notebook and return its exit value. 该方法会启动一个立即运行的临时作业。The method starts an ephemeral job that runs immediately.
timeout_seconds
参数控制运行的超时值(0 表示无超时):如果对 run
的调用在指定时间内未完成,则会引发异常。The timeout_seconds
parameter controls the timeout of the run (0 means no timeout): the call to run
throws an exception if it doesn’t finish within the specified time. 如果 Azure Databricks 停机时间超过 10 分钟,笔记本运行将失败,而不考虑 timeout_seconds
。If Azure Databricks is down for more than 10 minutes, the notebook run fails regardless of timeout_seconds
.
arguments
参数可设置目标笔记本的小组件值。The arguments
parameter sets widget values of the target notebook. 具体而言,如果正在运行的笔记本具有名为 A
的小组件,而且你将键值对 ("A": "B")
作为 arguments 参数的一部分传递给 run()
调用,则检索小组件 A
的值将返回 "B"
。Specifically, if the notebook you are running has a widget named A
, and you pass a key-value pair ("A": "B")
as part of the arguments parameter to the run()
call, then retrieving the value of widget A
will return "B"
. 可在小组件一文中找到有关创建和使用小组件的说明。You can find the instructions for creating and working with widgets in the Widgets article.
警告
arguments
参数只接受拉丁字符(ASCII 字符集)。The arguments
parameter accepts only Latin characters (ASCII character set). 使用非 ASCII 字符将会返回错误。Using non-ASCII characters will return an error. 例如,中文、日文汉字和表情符号都属于无效的非 ASCII 字符。Examples of invalid, non-ASCII characters are Chinese, Japanese kanjis, and emojis.
run
用法run
Usage
PythonPython
dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})
ScalaScala
dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))
run
示例run
Example
假设你有一个名为 workflows
的笔记本,其中包含一个名为 foo
,该笔记本将小组件值打印为:Suppose you have a notebook named workflows
with a widget named foo
that prints the widget’s value:
dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print dbutils.widgets.get("foo")
运行 dbutils.notebook.run("workflows", 60, {"foo": "bar"})
将产生以下结果:Running dbutils.notebook.run("workflows", 60, {"foo": "bar"})
produces the following result:
小组件具有通过工作流传入的值 "bar"
,而不是默认值。The widget had the value you passed in through the workflow, "bar"
, rather than the default.
exit(value: String): void
使用值退出笔记本。exit(value: String): void
Exit a notebook with a value. 如果使用 run
方法调用笔记本,则会返回以下值。If you call a notebook using the run
method, this is the value returned.
dbutils.notebook.exit("returnValue")
在作业中调用 dbutils.notebook.exit
可导致笔记本成功完成。Calling dbutils.notebook.exit
in a job causes the notebook to complete successfully. 如果希望作业失败,请引发异常。If you want to cause the job to fail, throw an exception.
示例 Example
以下示例将 arguments 传递到 DataImportNotebook
并根据 DataImportNotebook
的结果运行不同的笔记本(DataCleaningNotebook
或 ErrorHandlingNotebook
)。In the following example, you pass arguments to DataImportNotebook
and run different notebooks (DataCleaningNotebook
or ErrorHandlingNotebook
) based on the result from DataImportNotebook
.
当笔记本工作流运行时,会显示指向正在运行的笔记本的链接:When the notebook workflow runs, you see a link to the running notebook:
单击“笔记本作业 #xxxx”笔记本链接,查看该运行的详细信息:Click the notebook link Notebook job #xxxx to view the details of the run:
传递结构化数据Pass structured data
本部分说明如何在笔记本之间传递结构化数据。This section illustrates how to pass structured data between notebooks.
PythonPython
# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.
## In callee notebook
sqlContext.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
sqlContext.range(5).toDF("value").write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))
# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
"status": "OK",
"table": "my_data"
}))
## In caller notebook
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print json.loads(result)
ScalaScala
// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.
/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))
// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
/** In callee notebook */
// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)
// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))
/** In caller notebook */
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))
处理错误Handle errors
本部分说明如何处理笔记本工作流中的错误。This section illustrates how to handle errors in notebook workflows.
PythonPython
# Errors in workflows thrown a WorkflowException.
def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
num_retries = 0
while True:
try:
return dbutils.notebook.run(notebook, timeout, args)
except Exception as e:
if num_retries > max_retries:
raise e
else:
print "Retrying error", e
num_retries += 1
run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)
ScalaScala
// Errors in workflows thrown a WorkflowException.
import com.databricks.WorkflowException
// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
var numTries = 0
while (true) {
try {
return dbutils.notebook.run(notebook, timeout, args)
} catch {
case e: WorkflowException if numTries < maxTries =>
println("Error, retrying: " + e)
}
numTries += 1
}
"" // not reached
}
runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)
同时运行多个笔记本Run multiple notebooks concurrently
可使用标准 Scala 和 Python 构造同时运行多个笔记本,如 Thread(Scala 和 Python)和 Future(Scala 和 Python)。You can run multiple notebooks at the same time by using standard Scala and Python constructs such as Threads (Scala, Python) and Futures (Scala, Python). 高级笔记本工作流笔记本演示如何使用这些构造。The advanced notebook workflow notebooks demonstrate how to use these constructs. 笔记本为 Scala 语言,但你可轻松使用 Python 编写等效内容。The notebooks are in Scala but you could easily write the equivalent in Python. 运行示例:To run the example:
- 下载笔记本存档。Download the notebook archive.
- 将存档导入到工作区。Import the archive into a workspace.
- 运行“Concurrent Notebooks”笔记本。Run the Concurrent Notebooks notebook.