提问者:小点点

如何使用Spark Java将Spark数据帧写入Kafka Produceer时控制记录数


我有一个包含两列的spark数据框架,“keyCol”列和“valCol”列。数据帧非常大,有近1亿行。我想以小批量(即每分钟10000条记录)为kafka主题编写/生成数据帧。该spark作业将每天运行一次,创建该数据帧

如何在下面的代码中实现每分钟10000条记录的小批量写入,或者请建议是否有更好/有效的方法来实现这一点。

spark_df.foreachPartition(partitions ->{
            Producer<String, String> producer= new KafkaProducer<String, String>(allKafkaParamsMapObj);
            while (partitions) {
                Row row =  partitions.next();
                producer.send(new ProducerRecord<String, String>("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                       //Callback code goes here
                    }
                });
            }
            return;
        });

共1个答案

匿名用户

您可以使用如下 grouped(10000) 函数并执行睡眠线程一分钟

config.foreachPartition(f => {
      f.grouped(10000).foreach( (roqSeq : Seq[Row]) => { // Run 10000 in batch

        roqSeq.foreach( row => {
          producer.send(new Nothing("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Nothing() {
            def onCompletion(recordMetadata: Nothing, e: Exception): Unit = {
              //Callback code goes here
            }
          })
        })
          Thread.sleep(60000) // Sleep for 1 minute
        }
      )
    })