randomSplit 方法的行为Behavior of the randomSplit method

对数据帧使用 randomSplit 时,可能会观察到不一致的行为。When using randomSplit on a DataFrame, you could potentially observe inconsistent behavior. 以下是示例:Here is an example:

df = spark.read.format('inconsistent_data_source').load()
a,b = df.randomSplit([0.5, 0.5])
a.join(broadcast(b), on='id', how='inner').count()

通常,此查询会返回 0Typically this query returns 0. 但是,根据基础数据源或输入数据帧,在某些情况下,查询可能会生成超过 0 条记录。However, depending on the underlying data source or input DataFrame, in some cases the query could result in more than 0 records.

这种意外行为的原因是跨 RDD 分区的数据分布不是幂等的,可以在查询执行期间重新排列或更新,从而影响 randomSplit 方法的输出。This unexpected behavior is explained by the fact that data distribution across RDD partitions is not idempotent, and could be rearranged or updated during the query execution, thus affecting the output of the randomSplit method.

备注

  • Spark 数据帧和 RDD 会保留分区顺序;仅当查询输出依赖于跨分区的实际数据分布时,才会出现此问题,例如,来自文件 1、2 和 3 的值始终出现在分区 1 中。Spark DataFrames and RDDs preserve partitioning order; this problem only exists when query output depends on the actual data distribution across partitions, for example, values from files 1, 2 and 3 always appear in partition 1.
  • 使用 Delta 缓存时也可能会出现此问题。The issue could also be observed when using Delta cache. 在这种情况下,下面列出的所有解决方案仍然适用。All solutions listed below are still applicable in this case.

解决方案Solution

执行下列操作之一:Do one of the following:

  • 使用显式 Apache Spark RDD 缓存Use explicit Apache Spark RDD caching

    df = inputDF.cache()
    a,b = df.randomSplit([0.5, 0.5])
    
  • 按一列或一组列进行重新分区Repartition by a column or a set of columns

    df = inputDF.repartition(100, 'col1')
    a,b = df.randomSplit([0.5, 0.5])
    
  • 应用聚合函数Apply an aggregate function

    df = inputDF.groupBy('col1').count()
    a,b = df.randomSplit([0.5, 0.5])
    

这些操作会持久保存或重新整理数据,从而在 Spark 作业中实现各分区的数据分布一致。These operations persist or shuffle data resulting in the consistent data distribution across partitions in Spark jobs.