提问者:小点点

Spark:ForeachRDD,跳过行抛出任务不可序列化(scala闭包)


我有一个流HDFS文本文件的代码。但是每个文本文件都包含一个50行的标题和描述。我想忽略这些行,只接收数据。

这是我的代码,但它引发了Spark异常:任务不可序列化

val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/log")

hdfsDStream.foreachRDD(
  rdd => {
    val data = rdd.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
    => {
      if (partitionIdx == 0) {
        lines.drop(50)
      }
      lines
    })

    val rowRDD = data.map(_.split(",")).map(p => Row(p(0),p(1),p(2),p(3)))

    if (data.count() > 0) {
        ...
    }
  }
)

共2个答案

匿名用户

任务不可序列化错误发生在这种情况下:将函数传递给Spark:引用整个对象有什么风险?或者运行apache spark作业时出现任务不可序列化异常

很可能您正在那里创建某种对象并在RDD方法中调用其函数,该方法强制引擎序列化您的对象。

不幸的是,您打印的代码部分运行得非常好,问题出在用点替换的部分。例如,这个工作:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql._

val ssc = new StreamingContext(sc, Seconds(60))
val hdfsDStream = ssc.textFileStream("/sparkdemo/streaming")

hdfsDStream.foreachRDD(
  rdd => {
    val data = rdd.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
    => {
      if (partitionIdx == 0) {
        lines.drop(50)
      }
      lines
    })

    val rowRDD = data.map(_.split(",")).map(p => Row(p(0),p(1),p(2),p(3)))

    if (data.count() > 0) {
        rowRDD.take(10).foreach(println)
    }
  }
)
ssc.start()
ssc.awaitTermination()

匿名用户

我认为你只需要用zipWithIndex过滤掉Index小于50的情况。

val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/log")

hdfsDstream.foreachRDD( rdd => {
  val data = rdd.zipWithIndex.filter( _._2 < 50 ).map( _._1 )

  // Now do whatever you want with your data.
} )

也。。。这里 - Row(p(0),p(1),p(2),p(3)),你真的需要Row突然吗?