提问者:小点点

Apache Sparkjava.lang.ClassCastException在远程主节点中运行forEach分区时[重复]


我有一个Java的微服务,它连接到Apache Spark集群,并使用Datastax Spark-Cassandra连接器将数据持久化到apache Cassandra DB集群。

我编写了以下方法来从Cassandra表中删除特定日期范围的数据。

具体代码如下所示:

public void deleteData(String fromDate, String toDate) {


    SparkConf conf = sparkSession.sparkContext().getConf();
    CassandraConnector connector = CassandraConnector.apply(conf);

    Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
        put("keyspace", CassandraProperties.KEYSPACE);
        put("table", CassandraProperties.ENERGY_FORECASTS);
    }}).load()
            .filter(col("timestamp")
                    .substr(1, 10)
                    .between(fromDate, toDate))
            .select("nodeid");


    df.foreachPartition(partition -> {
        Session session = connector.openSession();
        while (partition.hasNext()) {
            Row row = partition.next();
            session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
        }
        session.close();
    });
}

}

    @Bean
public SparkSession sparkSession() {
    return SparkSession
            .builder()
            .appName("SparkCassandraApp")
            .config("spark.cassandra.connection.host", host)
            .config("spark.cassandra.connection.port", port)
            .config("spark.sql.caseSensitive", false)
            .master(master)
            .getOrCreate();

使用本地spark master节点(.master(“local[*]”)选项)运行时,代码执行良好。

但是,当我在连接到远程火花主节点时尝试执行相同的代码时,会发生以下错误:

驱动程序堆栈跟踪:]与根本原因tream.java:2069ClassCastException:无法将java.io.SerializedLambda的实例分配到字段tream.readDataset$$anonfuns$foreach分区$tream.java:1573类型的4美元java.io.ForeachPartionFunction在实例tream.defaultDataset$$anonfuns$foreachPartion2美元在tream.java:2287ObjectStreamClass$FieldRjava.io.ObjFieldValue(ObjectStreamCtream.read)在java.io.ObjectStreamClass.setObjFieldValue(ObjectStreamClass.java:1417)在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)在java.io.ObjectInputStream.read普通对象(ObjectInputSjava.lang.)在java.lang.invoke.ObjectInputSorg.apache.spark.sql.Object0(ObjectInputS2.func)在org.apache.spark.api.java.function.ObjectInputSorg.apache.spark.sql.ReadFields(ObjectInputSjava.io.)在eflector.setObjectInputSlass.java:2287Serjava. io处的ObjectInputStream. read普通对象(ObjectInputStream. java: 2069)。java. io处的ObjectInputStream. readObject0(ObjectInputStream. java: 1573)。org. apache. spak.序列化器处的ObjectInputStream. readObject(ObjectInputStream. java: 431)。org. apache. spak.序列化器处的JavaDeserializationStream. readObject(JavaSerializer. scala: 75)。org. apache. spak.调度器处的JavaSerializerInstance.反序列化(JavaSerializer. scala: 114)。org. apache. spak.调度器处的结果任务.运行任务(任务. scala: 83)。org. apache. spak.执行器处的任务.运行(任务. scala: 123)。执行器$任务

更新1

似乎对我有用的技巧是在spik会话配置中添加以下行:

.config("spark.jars", "meter-service-1.0.jar")

这似乎提供了缺失的依赖项,这些依赖项阻止Spark在远程节点上正确反序列化lamda表达式。

这在这里解释得更好


共1个答案

匿名用户

我的JAVA很脆弱,但您能否尝试将lambda提取到方法中?

public void deleteData(String fromDate, String toDate) {
    SparkConf conf = sparkSession.sparkContext().getConf();
    CassandraConnector connector = CassandraConnector.apply(conf);

    Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
        put("keyspace", CassandraProperties.KEYSPACE);
        put("table", CassandraProperties.ENERGY_FORECASTS);
    }}).load()
        .filter(col("timestamp")
                .substr(1, 10)
                .between(fromDate, toDate))
        .select("nodeid");


    df.foreachPartition(new ForeachPartitionFunction<Row>() {
        public void call(Iterator<Row> partition) {
            Session session = connector.openSession();
            while (partition.hasNext()) {
                Row row = partition.next();
                session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
            }
            session.close();
        }
    });
}