我有一个包含两列的spark数据框架,“keyCol”列和“valCol”列。数据帧非常大,有近1亿行。我想以小批量(即每分钟10000条记录)为kafka主题编写/生成数据帧。该spark作业将每天运行一次,创建该数据帧
如何在下面的代码中实现每分钟10000条记录的小批量写入,或者请建议是否有更好/有效的方法来实现这一点。
spark_df.foreachPartition(partitions ->{
Producer<String, String> producer= new KafkaProducer<String, String>(allKafkaParamsMapObj);
while (partitions) {
Row row = partitions.next();
producer.send(new ProducerRecord<String, String>("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//Callback code goes here
}
});
}
return;
});
您可以使用如下 grouped(10000)
函数并执行睡眠线程一分钟
config.foreachPartition(f => {
f.grouped(10000).foreach( (roqSeq : Seq[Row]) => { // Run 10000 in batch
roqSeq.foreach( row => {
producer.send(new Nothing("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Nothing() {
def onCompletion(recordMetadata: Nothing, e: Exception): Unit = {
//Callback code goes here
}
})
})
Thread.sleep(60000) // Sleep for 1 minute
}
)
})