我们有一项在时间窗口内聚合数据的工作。我们是spark的新手,我们观察到运行逻辑上与流处理作业相同的查询与批处理作业的性能特性有显著不同。我们希望了解正在发生的事情,并找到可能的方法来提高基于结构化流的方法的速度。
为了这篇文章,假设模式是
root
|-- objectId: long (nullable = true)
|-- eventTime: long (nullable = true)
|-- date: date (nullable = true)
|-- hour: integer (nullable = true)
哪里
date
和< code>hour是(派生的)分区键,即拼花文件存储在类似< code > date = 2020-07-26/hour = 4 的文件夹中。
< li >基础格式类型是三角洲湖。
< li >一小时的数据有大约2亿个事件
< li>objectId
分布很广(一小时内观察到1000万个不同的值,分布非常不均匀)
< li >我们正在尝试计算每个< code>objectId在5分钟时段内的事件数
< li >基础源代码从kafka队列中流出(并且每分钟运行一次)
< ul >
< li >每分钟有两个新文件出现在ADL2上,每个文件的大小为25MB(实际文件包含大约10个以上未显示的附加列)我们正在运行一个结构化的流式处理作业,基本上是执行以下操作:
df.read.format("delta")
.withWatermark("7 minutes") // watermark only applied to streaming query
.groupBy($"date", $"hour", $"objectId", window($"eventTime", "5 minutes"))
.coalesce(1) // debatable; we like limited number of files
.partitionBy("date", "hour")
.writeStream
.format("delta")
.option("checkpointLocation", <...>)
.partitionBy("date", "hour")
.start(<destination url>)
.awaitTermination
除了withWatermark
和writeStream
等的类似替换之外,关联的批处理作业基本上执行相同的操作。
我们在以下位置运行这些程序:
观察:
我的理解是,在给定水印(7分钟)和窗口大小(5分钟)的情况下,流式查询只需回顾不到15分钟,直到它可以写出5分钟的窗口并丢弃所有相关状态。
问题:
df.read.format(“delta”)
看起来您正在创建一个静态数据帧,然后将该静态数据帧转换为流式数据帧。聚合应用于静态数据帧,因此窗口可能不起作用。尝试创建流数据帧:
val DF = spark
.readStream
.format("delta")...
这里可以找到一些示例https://docs.databricks.com/delta/delta-streaming.html#delta-表即流源