我在spark作业中尝试解析json时遇到了一个问题。我使用的是< code>spark 1.1.0 、< code>json4s和< code>Cassandra Spark连接器。引发的异常是:
java.io。NotSerializableException:org.json4s.DefaultFormats
检查 DefaultFormats 配套对象,以及这个堆栈问题,很明显 DefaultFormats 无法序列化。现在的问题是该怎么做。
我可以看到这张票通过添加关键字transient在spark代码库中解决了这个问题,但我不确定如何或在哪里将它应用到我的案例中。解决方案是只在执行器上实例化DefaultFormats类,以避免序列化吗?还有人在用的另一个scala/spark的JSON解析库吗?我最初尝试单独使用jackson,但是遇到了一些注释错误,我无法轻易解决,json4s开箱即用。以下是我的代码:
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
val count = rdd.map(r => checkUa(r._2, r._1)).reduce((x, y) => x + y)
我做我的json解析在检查Ua函数。我试图使计数懒惰,希望它以某种方式延迟执行,但它没有效果。也许在检查UA中移动隐式val?任何建议非常感谢。
这已经在json4s的开放票证中得到了回答。解决方法是将隐式
声明放在函数内部
val count = rdd
.map(r => {implicit val formats = DefaultFormats; checkUa(r._2, r._1)})
.reduce((x, y) => x + y)
当我把< code >隐式val格式=...在包含解析的方法内部声明,而不是在类(对象)上声明。
所以这会抛出一个错误:
object Application {
//... Lots of other code here, which eventually calls
// setupStream(...)
def setupStream(streamingContext: StreamingContext,
brokers: String,
topologyTopicName: String) = {
implicit val formats = DefaultFormats
_createDStream(streamingContext, brokers, topologyTopicName)
// Remove the message key, which is always null in our case
.map(_._2)
.map((json: String) => parse(json).camelizeKeys
.extract[Record[TopologyMetadata, Unused]])
.print()
}
但这样就好了:
object Application {
implicit val formats = DefaultFormats
//... Lots of other code here, which eventually calls
// setupStream(...)
def setupStream(streamingContext: StreamingContext,
brokers: String,
topologyTopicName: String) = {
_createDStream(streamingContext, brokers, topologyTopicName)
// Remove the message key, which is always null in our case
.map(_._2)
.map((json: String) => parse(json).camelizeKeys
.extract[Record[TopologyMetadata, Unused]])
.print()
}