支持的语言
实时模式支持 Scala、Java 和 Python。
计算类型
实时模式支持以下计算类型:
对于使用 UDF 的延迟敏感工作负荷,Databricks 建议使用专用访问模式。 请参阅 表函数。
执行模式
实时模式仅支持更新模式:
| 执行模式 |
支持 |
| 更新模式 |
✓ |
| 追加模式 |
不支持 |
| 完整模式 |
不支持 |
源和汇
实时模式支持以下源和接收器:
| 源或汇 |
作为源 |
作为接收器 |
| Apache Kafka |
✓ |
✓ |
| 事件中心(使用 Kafka 连接器) |
✓ |
✓ |
| 动动力 |
• (仅 EFO 模式) |
不支持 |
| AWS MSK |
✓ |
不支持 |
| Delta |
不支持 |
不支持 |
| Google Pub/Sub(谷歌发布/订阅消息服务) |
不支持 |
不支持 |
| Apache Pulsar |
不支持 |
不支持 |
任意接收端(使用 forEachWriter) |
不適用 |
✓ |
运营商
实时模式支持大多数结构化流式处理运算符:
无状态操作
| Operator |
支持 |
| 选择 |
✓ |
| 投影 |
✓ |
mapPartitions |
不支持(请参阅限制) |
| Union |
• (有一些限制) |
UDFs
| Operator |
支持 |
| Scala 用户定义函数 (UDF) |
• (有一些限制) |
| Python 用户定义函数 (UDF) |
• (有一些限制) |
集合体
| 功能 |
支持 |
| sum |
✓ |
| 计数 |
✓ |
| 最大值 |
✓ |
| 分钟 |
✓ |
| avg |
✓ |
|
聚合函数 |
✓ |
Windowing
| Operator |
支持 |
| Tumbling |
✓ |
| Sliding |
✓ |
| 会期 |
不支持 |
去重
| Operator |
支持 |
| dropDuplicates |
✓ |
| 水印范围内删除重复项 |
✓ |
流式传输到表联接
| Operator |
支持 |
| 内联 |
✓ |
| 外部联接 |
✓ |
| 广播表联接(表大小为 10mb 或更少) |
✓ |
| 表联接(无广播) |
不支持 |
流到流联接
| Operator |
支持 |
| 内联 |
• (Databricks Runtime 18 及更高版本,具有一些配置) |
| 外部联接 |
不支持 |
注释
若要使用流以实时模式流式传输联接,必须设置其他 Spark 配置。 有关配置和运行多个流的要求的详细信息,请参阅 流到流联接。
任意有状态运算符
| Operator |
支持 |
| (平)MapGroupsWithState |
不支持 |
| transformWithState |
• (有一些差异) |
用户定义的接收器
| Sink |
支持 |
| forEach |
✓ |
| forEachBatch |
不支持 |
特殊注意事项
某些运算符和功能在实时模式下使用时具有特定的注意事项或差异。
为了构建自定义有状态应用程序,Databricks 支持 Apache Spark Structured Streaming 中的 transformWithState API。 有关 API 和代码片段的详细信息,请参阅 生成自定义有状态应用程序 。
但是,API 在实时模式下的行为与在微批处理查询中的行为不同。
- 实时模式为每一行调用
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)方法。
-
inputRows迭代器返回单个值。
微批处理模式为每个密钥调用一次, inputRows 迭代器返回微批处理中某个键的所有值。
- 编写代码时考虑此差异
- 实时模式下不支持事件时间计时器。
-
transformWithStateInPandas 在实时模式下不受支持。 请改用基于 transformWithState 行的 API,该 API 使用 Row 对象而不是 pandas 数据帧。
- 在实时模式下,计时器因数据到达而延迟触发:
- 如果计时器设置为10:00:00,但没有数据到达,计时器不会立刻触发。
- 如果数据到达 10:00:10,计时器会以 10 秒的延迟触发。
- 如果没有数据到达并且长时间运行的批处理正在终止,计时器会在批处理终止之前触发。
注释
在 Databricks Runtime 18.1 及更低版本中,如果使用 transformWithState 和实时模式进行低吞吐量的Python,则每秒少于 5 条记录,则可能会出现高达几百毫秒的延迟增加。 Databricks 建议升级到 Databricks Runtime 18.2 及更高版本以解决此问题。
以实时模式Python UDF
Databricks 在实时模式下支持大多数Python用户定义的函数(UDF):
无状态
| UDF 类型 |
支持 |
| Python标量 UDF (用户定义的标量函数 - Python) |
✓ |
| 箭头标量 UDF |
✓ |
| Pandas 标量 UDF(Pandas 用户定义函数) |
✓ |
箭头函数 (mapInArrow) |
✓ |
| Pandas 函数 (地图) |
✓ |
有状态分组(用户定义的聚合函数)
| UDF 类型 |
支持 |
transformWithState (仅 Row 接口) |
✓ |
transformWithStateInPandas |
不支持。 请改用基于 transformWithState 行的 API,该 API 使用 Row 对象而不是 pandas 数据帧。 有关详细信息,请参阅transformWithStateInPandas不支持。 |
applyInPandasWithState |
不支持 |
非有状态分组(UDAF)
| UDF 类型 |
支持 |
apply |
不支持 |
applyInArrow |
不支持 |
applyInPandas |
不支持 |
表函数
在实时模式下使用 Python UDF 时,需要考虑以下几点:
- 若要最大程度地减少延迟,请将箭头批大小 (
spark.sql.execution.arrow.maxRecordsPerBatch) 设置为 1。
- 权衡:此配置以牺牲吞吐量为代价优化延迟。 对于大多数工作负荷,建议使用此设置。
- 仅在需要更高的吞吐量来容纳输入量时才增加批处理大小,并接受可能的延迟增加。
- Pandas UDF 和函数在 Arrow 批大小为 1 时性能不佳。
- 如果使用 pandas UDF 或函数,请将箭头批大小设置为更高的值(例如 100 或更高)。
- 这意味着更高的延迟。 Databricks 建议尽可能使用箭头 UDF 或函数。
-
transformWithStateInPandas 在实时模式下不受支持。 请改用基于 transformWithState 行的 API,该 API 使用 Row 对象而不是 pandas 数据帧。 有关使用基于行的 API 的工作Python示例,请参阅 transformWithStateInPandas 和 real-time mode examples。
- 对于使用 UDF 的延迟敏感工作负荷,Databricks 建议使用专用访问模式。 在标准访问模式下,安全隔离开销可能会降低 UDF 性能。