提问者:小点点

使用 json4s 解析 JSON 时引发不可序列化异常


我在spark作业中尝试解析json时遇到了一个问题。我使用的是< code>spark 1.1.0 、< code>json4s和< code>Cassandra Spark连接器。引发的异常是:

java.io。NotSerializableException:org.json4s.DefaultFormats

检查 DefaultFormats 配套对象,以及这个堆栈问题,很明显 DefaultFormats 无法序列化。现在的问题是该怎么做。

我可以看到这张票通过添加关键字transient在spark代码库中解决了这个问题,但我不确定如何或在哪里将它应用到我的案例中。解决方案是只在执行器上实例化DefaultFormats类,以避免序列化吗?还有人在用的另一个scala/spark的JSON解析库吗?我最初尝试单独使用jackson,但是遇到了一些注释错误,我无法轻易解决,json4s开箱即用。以下是我的代码:

import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats

val count = rdd.map(r => checkUa(r._2, r._1)).reduce((x, y) => x + y) 

我做我的json解析在检查Ua函数。我试图使计数懒惰,希望它以某种方式延迟执行,但它没有效果。也许在检查UA中移动隐式val?任何建议非常感谢。


共2个答案

匿名用户

这已经在json4s的开放票证中得到了回答。解决方法是将隐式声明放在函数内部

val count = rdd
               .map(r => {implicit val formats = DefaultFormats; checkUa(r._2, r._1)})
               .reduce((x, y) => x + y) 

匿名用户

当我把< code >隐式val格式=...在包含解析的方法内部声明,而不是在类(对象)上声明。

所以这会抛出一个错误:

object Application {

  //... Lots of other code here, which eventually calls 
  // setupStream(...)

  def setupStream(streamingContext: StreamingContext,
                          brokers: String,
                          topologyTopicName: String) = {
    implicit val formats = DefaultFormats
    _createDStream(streamingContext, brokers, topologyTopicName)
      // Remove the message key, which is always null in our case
      .map(_._2)
      .map((json: String) => parse(json).camelizeKeys
        .extract[Record[TopologyMetadata, Unused]])
      .print()
}

但这样就好了:

object Application {

  implicit val formats = DefaultFormats

  //... Lots of other code here, which eventually calls 
  // setupStream(...)

  def setupStream(streamingContext: StreamingContext,
                          brokers: String,
                          topologyTopicName: String) = {
    _createDStream(streamingContext, brokers, topologyTopicName)
      // Remove the message key, which is always null in our case
      .map(_._2)
      .map((json: String) => parse(json).camelizeKeys
        .extract[Record[TopologyMetadata, Unused]])
      .print()
}