提问者:小点点

在不引起序列化问题的情况下丰富 SparkContext


我尝试使用Spark来处理来自HBase表的数据。这篇博文举例说明了如何使用< code>NewHadoopAPI从任何Hadoop InputFormat中读取数据。

我所做的一切

由于我需要多次执行此操作,因此我试图使用隐含来丰富SparkContext,以便我可以从HBase中的一组给定列中获取RDD。我编写了以下助手:

trait HBaseReadSupport {
  implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc)

  implicit def bytes2string(bytes: Array[Byte]) = new String(bytes)
}


final class HBaseSC(sc: SparkContext) extends Serializable {
  def extract[A](data: Map[String, List[String]], result: Result, interpret: Array[Byte] => A) =
    data map { case (cf, columns) =>
      val content = columns map { column =>
        val cell = result.getColumnLatestCell(cf.getBytes, column.getBytes)

        column -> interpret(CellUtil.cloneValue(cell))
      } toMap

      cf -> content
    }

  def makeConf(table: String) = {
    val conf = HBaseConfiguration.create()

    conf.setBoolean("hbase.cluster.distributed", true)
    conf.setInt("hbase.client.scanner.caching", 10000)
    conf.set(TableInputFormat.INPUT_TABLE, table)

    conf
  }

  def hbase[A](table: String, data: Map[String, List[String]])
    (interpret: Array[Byte] => A) =

    sc.newAPIHadoopRDD(makeConf(table), classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result]) map { case (key, row) =>
        Bytes.toString(key.get) -> extract(data, row, interpret)
      }

}

它可以像这样使用

val rdd = sc.hbase[String](table, Map(
  "cf" -> List("col1", "col2")
))

在本例中,我们得到一个RDD<code>(String,Map[String,Map[Sring,String]])</code>,其中第一个组件是rowkey,第二个组件是Map,其键是列族,值是Map,键是列,内容是单元格值。

失败的地方

不幸的是,似乎我的工作得到了对 sc 的引用,它本身在设计上是不可序列化的。当我运行作业时,我得到的是

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)

我可以删除助手类,并在我的工作中使用相同的逻辑,一切运行良好。但是我想得到一些可以重用的东西,而不是一遍又一遍地写同样的样板文件。

顺便说一句,这个问题并不是隐式的,即使使用sc的函数也会出现同样的问题。

相比之下,以下读取 TSV 文件的帮助程序(我知道它坏了,因为它不支持引用等等,没关系)似乎工作正常:

trait TsvReadSupport {
  implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc)
}

final class TsvRDD(val sc: SparkContext) extends Serializable {
  def tsv(path: String, fields: Seq[String], separator: Char = '\t') = sc.textFile(path) map { line =>
    val contents = line.split(separator).toList

    (fields, contents).zipped.toMap
  }
}

如何封装从HBase读取行的逻辑,而不会无意中捕获SparkContext?


共1个答案

匿名用户

只需将@transient注释添加到 sc 变量:

final class HBaseSC(@transient val sc: SparkContext) extends Serializable {
  ...
}

并确保 SC 未在提取函数中使用,因为它在工作线程上不可用。

如果需要从分布式计算中访问 Spark 上下文,可以使用 rdd.context 函数:

val rdd = sc.newAPIHadoopRDD(...)
rdd map {
  case (k, v) => 
    val ctx = rdd.context
    ....
}