我正在尝试从employee_table
获取 employeeId
,并使用此 id 查询employee_address
表以获取地址。
桌子没有任何问题。但是当我运行下面的代码时,我得到< code > org . Apache . spark . spark exception:Task not serializable
我想我知道这个问题。问题是sparkContext与master有关,而与worker无关。但我不知道该怎么理解这件事。
val employeeRDDRdd = sc.cassandraTable("local_keyspace", "employee_table")
try {
val data = employeeRDDRdd
.map(row => {
row.getStringOption("employeeID") match {
case Some(s) if (s != null) && s.nonEmpty => s
case None => ""
}
})
//create tuple of employee id and address. Filtering out cases when for an employee address is empty.
val id = data
.map(s => (s,getID(s)))
filter(tups => tups._2.nonEmpty)
//printing out total size of rdd.
println(id.count())
} catch {
case e: Exception => e.printStackTrace()
}
def getID(employeeID: String): String = {
val addressRDD = sc.cassandraTable("local_keyspace", "employee_address")
val data = addressRDD.map(row => row.getStringOption("address") match {
case Some(s) if (s != null) && s.nonEmpty => s
case None => ""
})
data.collect()(0)
}
例外情况==
rg.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2039)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
序列化问题是由
val addressRDD = sc.cassandraTable("local_keyspace", "employee_address")
此部分在序列化lambda内部使用,如下所示:
val id = data
.map(s => (s,getID(s)))
所有RDD
转换都表示远程执行的代码,这意味着它们的全部内容必须是可序列化的。
Spark 上下文不可序列化,但“getID”必须工作,因此存在异常。基本规则是你不能在任何RDD
转换中触摸SparkContext
。
如果你真的想在cassandra中加入数据,你有几个选择。
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable
分别加载两个RDD并执行Spark Join
val leftrdd = sc.cassandraTable(test, table1)
val rightrdd = sc.cassandraTable(test, table2)
leftrdd.join(rightRdd)