我正在努力学习spark scala。我想读HBase,但没有mapreduce。我创建了一个简单的HBase表“test”,并在其中进行了3次输入。我想通过spark(没有使用mapreduce的HBaseTest)阅读它。我尝试在shell上运行以下命令
val numbers = Array(
new Get(Bytes.toBytes("row1")),
new Get(Bytes.toBytes("row2")),
new Get(Bytes.toBytes("row3")))
val conf = new HBaseConfiguration()
val table = new HTable(conf, "test")
sc.parallelize(numbers, numbers.length).map(table.get).count()
我不断收到错误 - org.apache.spark.SparkException: Job aborted: Task Not serializable: java.io.NotSerializableException: org.apache.hadoop.hbase.HBaseConfiguration
有人能帮我吗,我如何创建一个使用可序列化配置的Htable
谢谢
您的问题是< code>table不可序列化(确切地说,它是成员< code>conf),并且您试图通过在< code>map中使用它来序列化它。他们的方式,你试图读取HBase是不太正确的,它看起来像你尝试一些特定的Get,然后试图并行地做它们。即使您真的做到了这一点,这也不会像您将要执行的随机读取那样伸缩。您想要做的是使用Spark执行表扫描,下面的代码片段应该可以帮助您完成:
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
这将为您提供一个RDD,其中包含构成行的NaviagableMap。以下是如何将NaviagbleMap更改为字符串的普通Scala映射:
...
.map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
.map(kv => (Bytes.toString(kv._1), rowToStrMap(kv._2)))
def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
navMap.asScala.toMap.map(cf =>
(cf._1, cf._2.asScala.toMap.map(col =>
(col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))
def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr =
navMap.map(cf =>
(Bytes.toString(cf._1), cf._2.map(col =>
(Bytes.toString(col._1), col._2.map(elem => (elem._1, Bytes.toString(elem._2)))))))
最后一点,如果你真的想尝试并行执行随机读取,我相信你可以把HBase表初始化放在map
中。
当你这样做时会发生什么
@瞬时值conf=新的HBaseConfiguration
更新 显然,HBase 提交任务的其他部分也是不可序列化的。其中每一项都需要解决。
>
考虑实体是否在线路的两侧具有相同的含义/语义学。任何连接肯定不会。HBaseConfigation不应该被序列化。但是在原语之上构建的原语和简单对象——并且不包含上下文相关数据——可以包含在序列化中
对于上下文敏感的实体(包括HBaseConfiguration和任何面向连接的数据结构),您应该将它们标记为@transient,然后在readObject()方法中使用与客户端环境相关的值来实例化它们。