提问者:小点点

结构化流与批处理性能的差异


我们有一项在时间窗口内聚合数据的工作。我们是spark的新手,我们观察到运行逻辑上与流处理作业相同的查询与批处理作业的性能特性有显著不同。我们希望了解正在发生的事情,并找到可能的方法来提高基于结构化流的方法的速度。

为了这篇文章,假设模式是

root
 |-- objectId: long (nullable = true)
 |-- eventTime: long (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)

哪里

    < li>date和< code>hour是(派生的)分区键,即拼花文件存储在类似< code > date = 2020-07-26/hour = 4 的文件夹中。 < li >基础格式类型是三角洲湖。 < li >一小时的数据有大约2亿个事件 < li>objectId分布很广(一小时内观察到1000万个不同的值,分布非常不均匀) < li >我们正在尝试计算每个< code>objectId在5分钟时段内的事件数 < li >基础源代码从kafka队列中流出(并且每分钟运行一次) < ul > < li >每分钟有两个新文件出现在ADL2上,每个文件的大小为25MB(实际文件包含大约10个以上未显示的附加列)

我们正在运行一个结构化的流式处理作业,基本上是执行以下操作:

df.read.format("delta")
  .withWatermark("7 minutes") // watermark only applied to streaming query
  .groupBy($"date", $"hour", $"objectId", window($"eventTime", "5 minutes"))
  .coalesce(1) // debatable; we like limited number of files
  .partitionBy("date", "hour")
  .writeStream
  .format("delta")
  .option("checkpointLocation", <...>)
  .partitionBy("date", "hour")
  .start(<destination url>)
  .awaitTermination

除了withWatermarkwriteStream等的类似替换之外,关联的批处理作业基本上执行相同的操作。

我们在以下位置运行这些程序:

  • 蓝色数据砖
  • 蔚蓝数据湖2代

观察:

    < li >批处理作业能够在尽可能小的集群(3x F4s)上运行,在大约一分钟内聚合一个小时的时间 < li >即使使用(3个DS3_v2)也无法完成结构化流作业,因此我们必须配置更大的实例(3个l4,每个节点32GB) < ul > < li > CPUs实际上处于空闲状态(97.4%空闲) < li >每个微批次需要30-60秒(几乎全部花费在< code>addBatch) < li >低网络活动(可能2MB / s)

我的理解是,在给定水印(7分钟)和窗口大小(5分钟)的情况下,流式查询只需回顾不到15分钟,直到它可以写出5分钟的窗口并丢弃所有相关状态。

问题:

  • 为什么基于结构化流的解决方案需要更多的内存?
    • 假设我们必须维护大约 1000 万个条目的状态,我不明白我们怎么需要那么多

共1个答案

匿名用户

df.read.format(“delta”)

看起来您正在创建一个静态数据帧,然后将该静态数据帧转换为流式数据帧。聚合应用于静态数据帧,因此窗口可能不起作用。尝试创建流数据帧:

  val DF = spark
  .readStream
  .format("delta")...

这里可以找到一些示例https://docs.databricks.com/delta/delta-streaming.html#delta-表即流源