我有一个Java的微服务,它连接到Apache Spark集群,并使用Datastax Spark-Cassandra连接器将数据持久化到apache Cassandra DB集群。
我编写了以下方法来从Cassandra表中删除特定日期范围的数据。
具体代码如下所示:
public void deleteData(String fromDate, String toDate) {
SparkConf conf = sparkSession.sparkContext().getConf();
CassandraConnector connector = CassandraConnector.apply(conf);
Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
put("keyspace", CassandraProperties.KEYSPACE);
put("table", CassandraProperties.ENERGY_FORECASTS);
}}).load()
.filter(col("timestamp")
.substr(1, 10)
.between(fromDate, toDate))
.select("nodeid");
df.foreachPartition(partition -> {
Session session = connector.openSession();
while (partition.hasNext()) {
Row row = partition.next();
session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
}
session.close();
});
}
}
@Bean
public SparkSession sparkSession() {
return SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", host)
.config("spark.cassandra.connection.port", port)
.config("spark.sql.caseSensitive", false)
.master(master)
.getOrCreate();
使用本地spark master节点(.master(“local[*]”)选项)运行时,代码执行良好。
但是,当我在连接到远程火花主节点时尝试执行相同的代码时,会发生以下错误:
驱动程序堆栈跟踪:]与根本原因tream.java:2069ClassCastException:无法将java.io.SerializedLambda的实例分配到字段tream.readDataset$$anonfuns$foreach分区$tream.java:1573类型的4美元java.io.ForeachPartionFunction在实例tream.defaultDataset$$anonfuns$foreachPartion2美元在tream.java:2287ObjectStreamClass$FieldRjava.io.ObjFieldValue(ObjectStreamCtream.read)在java.io.ObjectStreamClass.setObjFieldValue(ObjectStreamClass.java:1417)在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)在java.io.ObjectInputStream.read普通对象(ObjectInputSjava.lang.)在java.lang.invoke.ObjectInputSorg.apache.spark.sql.Object0(ObjectInputS2.func)在org.apache.spark.api.java.function.ObjectInputSorg.apache.spark.sql.ReadFields(ObjectInputSjava.io.)在eflector.setObjectInputSlass.java:2287Serjava. io处的ObjectInputStream. read普通对象(ObjectInputStream. java: 2069)。java. io处的ObjectInputStream. readObject0(ObjectInputStream. java: 1573)。org. apache. spak.序列化器处的ObjectInputStream. readObject(ObjectInputStream. java: 431)。org. apache. spak.序列化器处的JavaDeserializationStream. readObject(JavaSerializer. scala: 75)。org. apache. spak.调度器处的JavaSerializerInstance.反序列化(JavaSerializer. scala: 114)。org. apache. spak.调度器处的结果任务.运行任务(任务. scala: 83)。org. apache. spak.执行器处的任务.运行(任务. scala: 123)。执行器$任务
更新1
似乎对我有用的技巧是在spik会话配置中添加以下行:
.config("spark.jars", "meter-service-1.0.jar")
这似乎提供了缺失的依赖项,这些依赖项阻止Spark在远程节点上正确反序列化lamda表达式。
这在这里解释得更好
我的JAVA很脆弱,但您能否尝试将lambda提取到方法中?
public void deleteData(String fromDate, String toDate) {
SparkConf conf = sparkSession.sparkContext().getConf();
CassandraConnector connector = CassandraConnector.apply(conf);
Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
put("keyspace", CassandraProperties.KEYSPACE);
put("table", CassandraProperties.ENERGY_FORECASTS);
}}).load()
.filter(col("timestamp")
.substr(1, 10)
.between(fromDate, toDate))
.select("nodeid");
df.foreachPartition(new ForeachPartitionFunction<Row>() {
public void call(Iterator<Row> partition) {
Session session = connector.openSession();
while (partition.hasNext()) {
Row row = partition.next();
session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
}
session.close();
}
});
}