提问者:小点点

火花序列化错误


我正在努力学习spark scala。我想读HBase,但没有mapreduce。我创建了一个简单的HBase表“test”,并在其中进行了3次输入。我想通过spark(没有使用mapreduce的HBaseTest)阅读它。我尝试在shell上运行以下命令

val numbers = Array(
  new Get(Bytes.toBytes("row1")), 
  new Get(Bytes.toBytes("row2")), 
  new Get(Bytes.toBytes("row3")))
val conf = new HBaseConfiguration()
val table = new HTable(conf, "test")
sc.parallelize(numbers, numbers.length).map(table.get).count()

我不断收到错误 - org.apache.spark.SparkException: Job aborted: Task Not serializable: java.io.NotSerializableException: org.apache.hadoop.hbase.HBaseConfiguration

有人能帮我吗,我如何创建一个使用可序列化配置的Htable

谢谢


共2个答案

匿名用户

您的问题是< code>table不可序列化(确切地说,它是成员< code>conf),并且您试图通过在< code>map中使用它来序列化它。他们的方式,你试图读取HBase是不太正确的,它看起来像你尝试一些特定的Get,然后试图并行地做它们。即使您真的做到了这一点,这也不会像您将要执行的随机读取那样伸缩。您想要做的是使用Spark执行表扫描,下面的代码片段应该可以帮助您完成:

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)

sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])

这将为您提供一个RDD,其中包含构成行的NaviagableMap。以下是如何将NaviagbleMap更改为字符串的普通Scala映射:

...
.map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
.map(kv => (Bytes.toString(kv._1), rowToStrMap(kv._2)))

def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
  navMap.asScala.toMap.map(cf =>
    (cf._1, cf._2.asScala.toMap.map(col =>
      (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))

def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr =
  navMap.map(cf =>
    (Bytes.toString(cf._1), cf._2.map(col =>
      (Bytes.toString(col._1), col._2.map(elem => (elem._1, Bytes.toString(elem._2)))))))

最后一点,如果你真的想尝试并行执行随机读取,我相信你可以把HBase表初始化放在map中。

匿名用户

当你这样做时会发生什么

@瞬时值conf=新的HBaseConfiguration

更新 显然,HBase 提交任务的其他部分也是不可序列化的。其中每一项都需要解决。

>

  • 考虑实体是否在线路的两侧具有相同的含义/语义学。任何连接肯定不会。HBaseConfigation不应该被序列化。但是在原语之上构建的原语和简单对象——并且不包含上下文相关数据——可以包含在序列化中

    对于上下文敏感的实体(包括HBaseConfiguration和任何面向连接的数据结构),您应该将它们标记为@transient,然后在readObject()方法中使用与客户端环境相关的值来实例化它们。