我有一个流HDFS文本文件的代码。但是每个文本文件都包含一个50行的标题和描述。我想忽略这些行,只接收数据。
这是我的代码,但它引发了Spark异常:任务不可序列化
val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/log")
hdfsDStream.foreachRDD(
rdd => {
val data = rdd.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
=> {
if (partitionIdx == 0) {
lines.drop(50)
}
lines
})
val rowRDD = data.map(_.split(",")).map(p => Row(p(0),p(1),p(2),p(3)))
if (data.count() > 0) {
...
}
}
)
任务不可序列化错误发生在这种情况下:将函数传递给Spark:引用整个对象有什么风险?或者运行apache spark作业时出现任务不可序列化异常
很可能您正在那里创建某种对象并在RDD方法中调用其函数,该方法强制引擎序列化您的对象。
不幸的是,您打印的代码部分运行得非常好,问题出在用点替换的部分。例如,这个工作:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql._
val ssc = new StreamingContext(sc, Seconds(60))
val hdfsDStream = ssc.textFileStream("/sparkdemo/streaming")
hdfsDStream.foreachRDD(
rdd => {
val data = rdd.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
=> {
if (partitionIdx == 0) {
lines.drop(50)
}
lines
})
val rowRDD = data.map(_.split(",")).map(p => Row(p(0),p(1),p(2),p(3)))
if (data.count() > 0) {
rowRDD.take(10).foreach(println)
}
}
)
ssc.start()
ssc.awaitTermination()
我认为你只需要用zipWithIndex过滤掉Index小于50的情况。
val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/log")
hdfsDstream.foreachRDD( rdd => {
val data = rdd.zipWithIndex.filter( _._2 < 50 ).map( _._1 )
// Now do whatever you want with your data.
} )
也。。。这里 - Row(p(0),p(1),p(2),p(3)),
你真的需要Row
突然吗?