如何构建简单、高效和低延迟的数据管道

如今的数据驱动型企业持续生成数据,这需要工程数据管道不断引入和转换这些数据。 这些管道应该能够精确处理和传递一次数据,生成延迟小于 200 毫秒的结果,并始终尝试将成本降到最低。

本文介绍工程数据管道的批处理和增量流处理方法、为什么增量流处理是更好的选择,以及用于开始使用 Databricks 增量流处理产品、 结构化流概念DLT 的后续步骤。 这些功能可实现快速编写和运行管道,保证交付语义、延迟、成本等。

重复批处理作业的陷阱

设置数据管道时,可以先编写重复的批处理作业来引入数据。 例如,每小时可以运行一个 Spark 作业,该作业从源读取数据并将数据写入到 Delta Lake 等接收器中。 这种方法的挑战在于以增量方式处理源,因为每小时运行的 Spark 作业需要从上一个作业结束的地方开始。 可以记录所处理数据的最新时间戳,然后选择时间戳比该时间戳更新的所有行,但这样做存在陷阱:

要运行连续数据管道,可以尝试安排每小时的批处理作业,该作业以增量方式从源读取、进行转换并将结果写入 Delta Lake 之类的接收器。 此方法可能存在陷阱:

  • 在时间戳之后查询所有新数据的 Spark 作业将会漏掉迟到的数据。
  • 如果处理不当,失败的 Spark 作业可能会导致破坏“仅一次”语义保证。
  • 列出云存储位置的内容来查找新文件的 Spark 作业将变得昂贵。

然后,仍需要重复转换此数据。 你可能会编写重复的批处理作业,然后合并数据或应用其他操作,这会进一步让管道复杂化并降低其效率。

批处理示例

为了充分理解批量引入和转换处理中管道可能遇到的陷阱,请参考以下示例。

丢失的数据

如果 Kafka 主题包含使用情况数据,用于确定向客户收取多少费用,并且管道正在批量引入,事件序列可能如下所示:

  1. 第一批有两条记录,分别在上午 8 点和 8 点 30 分。

  2. 您将最新的时间戳更新为上午 8:30。

  3. 上午 8:15 又有一条记录。

  4. 第二批查询了上午 8:30 之后的所有内容,因此你错过了上午 8:15的记录。

此外,你不希望对用户过度收费或少收费,因此必须确保每条记录恰好引入一次。

冗余处理

接下来,假设你的数据包含“用户购买”行,并且你想要聚合每小时的销售额,以便了解商店中最畅销的时间。 如果同一小时的购买分为不同的批次到达,那么将有多个批次产生同一小时的输出:

在同一小时内具有多次输入的批处理引入示例

上午 8 点到 9 点的窗口有两个元素(批次 1 的输出)、一个元素(批次 2 的输出)还是三个元素(没有任何批次的输出)? 产生给定时间窗口所需的数据出现在多个转换批次中。 要解决此问题,可以按天对数据进行分区,并在需要计算结果时重新处理整个分区。 然后,可以在接收器中覆盖结果:

批处理引入示例,表示重新处理以前的输入

然而,这是以延迟和成本为代价的,因为第二批需要做处理可能已处理数据的不必要工作。

增量流处理不存在陷阱

增量式流处理能够轻松避免重复批处理作业中涉及数据引入和转换的各种陷阱。 Databricks 结构化流式处理DLT 管理流式处理实现的复杂性,使你能够专注于业务逻辑。 只需指定要连接的源、对数据要执行的转换,以及结果写入的位置。

增量引入

Databricks 中的增量引入由 Apache Spark 结构化流式处理提供支持,它可以以增量方式使用数据源并将其写入接收器。 结构化流式处理引擎可以恰好使用一次数据,并且可以处理无序数据。 该引擎可以在笔记本中运行,也可以在 DLT 中使用流式处理表来运行。

Databricks 上的结构化流式处理引擎提供 AutoLoader 等专有的流式处理源,可以以经济高效的方式增量处理云文件。 Databricks 还为 Apache KafkaAmazon KinesisApache Pulsar 等其他常用消息总线提供连接器。

结构化流式处理还是 DLT?

结构化流式处理和 DLT 之间的显著差异在于操作流式处理查询的方式。 在结构化流式处理中,需要手动指定许多配置,并且必须手动将查询整合在一起。 必须显式启动查询,等待查询终止、失败时取消查询以及执行其他操作。 在 DLT 中,您以声明方式提供要让 DLT 运行的管道,它会持续地保持这些管道的运行。

有关这些功能的详细信息,请参阅 结构化流概念DLT

后续步骤