提问者:小点点

org . Apache . Spark . Spark exception:Spark Scala中的任务不可序列化


我正在尝试从employee_table获取 employeeId,并使用此 id 查询employee_address表以获取地址。

桌子没有任何问题。但是当我运行下面的代码时,我得到< code > org . Apache . spark . spark exception:Task not serializable

我想我知道这个问题。问题是sparkContext与master有关,而与worker无关。但我不知道该怎么理解这件事。

val employeeRDDRdd = sc.cassandraTable("local_keyspace", "employee_table")


try {

  val data = employeeRDDRdd
    .map(row => {
      row.getStringOption("employeeID") match {
        case Some(s) if (s != null) && s.nonEmpty => s
        case None => ""
      }
    })

    //create tuple of employee id and address. Filtering out cases when  for an employee address is empty.

  val id = data
    .map(s => (s,getID(s)))
    filter(tups => tups._2.nonEmpty)

    //printing out total size of rdd.
    println(id.count())




} catch {
  case e: Exception => e.printStackTrace()
}

def getID(employeeID: String): String = {
  val addressRDD = sc.cassandraTable("local_keyspace", "employee_address")
  val data = addressRDD.map(row => row.getStringOption("address") match {
    case Some(s) if (s != null) && s.nonEmpty => s
    case None => ""
  })
  data.collect()(0)
}

例外情况==

rg.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2039)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.map(RDD.scala:365)

共1个答案

匿名用户

序列化问题是由

val addressRDD = sc.cassandraTable("local_keyspace", "employee_address")

此部分在序列化lambda内部使用,如下所示:

val id = data
  .map(s => (s,getID(s)))

所有RDD转换都表示远程执行的代码,这意味着它们的全部内容必须是可序列化的。

Spark 上下文不可序列化,但“getID”必须工作,因此存在异常。基本规则是你不能在任何RDD转换中触摸SparkContext

如果你真的想在cassandra中加入数据,你有几个选择。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable

分别加载两个RDD并执行Spark Join

val leftrdd = sc.cassandraTable(test, table1)
val rightrdd = sc.cassandraTable(test, table2)
leftrdd.join(rightRdd)