提问者:小点点

Spark/Java可序列化问题-org.apache.Spark。SparkException:任务不可序列化


在使用Java编写Spark应用程序时,我遇到了以下代码问题:

public class BatchLayerDefaultJob implements Serializable {

private static Function <BatchLayerProcessor, Future> batchFunction = new Function<BatchLayerProcessor, Future>() {
    @Override
    public Future call(BatchLayerProcessor s) {
        return executor.submit(s);
    }
};
public void applicationRunner(BatchParameters batchParameters) {


 SparkConf sparkConf = new SparkConf().setAppName("Platform Engine - Batch Job");
 sparkConf.set("spark.driver.allowMultipleContexts", "true");
 sparkConf.set("spark.cores.max", "1");
 JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
 List<BatchLayerProcessor> batchListforRDD = new ArrayList<BatchLayerProcessor>();

// populate List here.... Then attempt to process below

JavaRDD<BatchLayerProcessor> distData = sparkContext.parallelize(batchListforRDD, batchListforRDD.size());
JavaRDD<Future> result = distData.map(batchFunction);
result.collect(); // <-- Produces an object not serializable exception here 

所以我尝试了很多事情都无济于事,包括将 batchFunction 提取为主类影响之外的单独类,并且我还尝试使用 mapPartitions 而不是 map。我或多或少没有想法。任何帮助,不胜感激。

堆栈跟踪如下:

17/11/30 17:11:28 INFO DAGScheduler: Job 0 failed: collect at 
BatchLayerDefaultJob.java:122, took 23.406561 s
Exception in thread "Thread-8" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 0, not attempting to retry it. Exception during serialization: 
java.io.NotSerializableException: xxxx.BatchLayerProcessor
Serialization stack:
- object not serializable (class: xxxx.BatchLayerProcessor, value: xxxx.BatchLayerProcessor@3e745097)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(xxxx.BatchLayerProcessor@3e745097))
- writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
- object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@691)
- field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
- object (class org.apache.spark.scheduler.ResultTask, ResultTask(0, 0))
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)

干杯。

EDIT::按要求添加了BatchLayerProcessor略有删节:

public class BatchLayerProcessor implements Runnable, Serializable {
private int interval, backMinutes;
private String scoreVal, batchjobid;
private static CountDownLatch countDownLatch;
 public void run() {
    /* Get a reference to the ApplicationContextReader, a singleton*/
    ApplicationContextReader applicationContextReaderCopy = ApplicationContextReader.getInstance();

    synchronized (BatchLayerProcessor.class) /* Protect singleton member variable from multithreaded access. */ {
        if (applicationContextReader == null) /* If local reference is null...*/
            applicationContextReader = applicationContextReaderCopy; /* ...set it to the singleton */
    }

    if (getxScoreVal().equals("")) {
        applicationContextReader.getScoreService().calculateScores(applicationContextReader.getFunctions(), getInterval(), getBackMinutes(), getScoreVal(), true, getTimeInterval(), getIncludes(), getExcludes());
    }
    else {
        applicationContextReader.getScoreService().calculateScores(applicationContextReader.getFunctions(), getInterval(), getBackMinutes(), getScoreVal(), true, getTimeInterval(), getIncludes(), getExcludes());
    }

    countDownLatch.countDown();
}

共1个答案

匿名用户

决定更改BatchLayer处理器,使其不可运行,而是依靠Spark为我完成工作。