以编程方式创建多个表
可以将 Python 与 Delta Live Tables 配合使用,以编程方式创建多个表,以减少代码冗余。
你的管道可能包含多个流或数据集定义,它们只是在少量的参数上有差别。 这种冗余导致管道容易出错且难以维护。 例如,下图显示的管道使用消防部门数据集,根据不同的火警电话类别查找响应时间最快的社区。 在此示例中,并行流只是在几个参数上有差别。
使用 Python 进行的 Delta Live Tables 元编程示例
可以使用元编程模式来降低生成和维护冗余流定义的开销。 增量实时表中的元编程是使用 Python 内部函数进行的。 由于这些函数是惰性求值的,因此可以使用它们来创建只是输入参数有差别的流。 每个调用可以包含一组不同的参数,这些参数控制每个表的生成方式,如以下示例所示。
重要
由于延迟调用带有 Delta Live Tables 修饰器的 Python 函数,因此在循环中创建数据集时,你必须调用单独的函数来创建数据集,以确保使用正确的参数值。 未能在单独的函数中创建数据集会导致多个表使用循环的最终执行中的参数。
以下示例在循环内调用 create_table()
函数来创建表 t1
和 t2
:
def create_table(name):
@dlt.table(name=name)
def t():
return spark.read.table(name)
tables = ["t1", "t2"]
for t in tables:
create_table(t)
import dlt
from pyspark.sql.functions import *
@dlt.table(
name="raw_fire_department",
comment="raw table for fire department response"
)
@dlt.expect_or_drop("valid_received", "received IS NOT NULL")
@dlt.expect_or_drop("valid_response", "responded IS NOT NULL")
@dlt.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
return (
spark.read.format('csv')
.option('header', 'true')
.option('multiline', 'true')
.load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
.withColumnRenamed('Call Type', 'call_type')
.withColumnRenamed('Received DtTm', 'received')
.withColumnRenamed('Response DtTm', 'responded')
.withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
.select('call_type', 'received', 'responded', 'neighborhood')
)
all_tables = []
def generate_tables(call_table, response_table, filter):
@dlt.table(
name=call_table,
comment="top level tables by call type"
)
def create_call_table():
return (
spark.sql("""
SELECT
unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
neighborhood
FROM LIVE.raw_fire_department
WHERE call_type = '{filter}'
""".format(filter=filter))
)
@dlt.table(
name=response_table,
comment="top 10 neighborhoods with fastest response time "
)
def create_response_table():
return (
spark.sql("""
SELECT
neighborhood,
AVG((ts_received - ts_responded)) as response_time
FROM LIVE.{call_table}
GROUP BY 1
ORDER BY response_time
LIMIT 10
""".format(call_table=call_table))
)
all_tables.append(response_table)
generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")
@dlt.table(
name="best_neighborhoods",
comment="which neighbor appears in the best response time list the most"
)
def summary():
target_tables = [dlt.read(t) for t in all_tables]
unioned = functools.reduce(lambda x,y: x.union(y), target_tables)
return (
unioned.groupBy(col("neighborhood"))
.agg(count("*").alias("score"))
.orderBy(desc("score"))
)