我正在编写一个 Java 应用程序。我有一个火花数据集
Dataset<MyObject> dataset = sparkSession.createDataset(someRDD, Encoders.javaSerialization(MyObject.class));
dataset.printSchema();
//root
//|-- value: binary (nullable = true)
MyObject
有不同的(嵌套的)字段,我想在数据集中的多个列中“分解”它们。新列还需要根据MyObject
中的多个属性计算。作为解决方案,我可以使用。withColumn()
并应用UDF。不幸的是,我不知道如何在UDF中接受二进制类型,然后将其转换为MyObject
。有什么建议吗?
多亏了布莱克主教的建议,我解决了它。以下是完整的解决方案:
您需要注册UDF:
UDFRegistration udfRegistration = sparkSession.sqlContext().udf();
udfRegistration.register("extractSomeLong", extractSomeLong(), DataTypes.LongType);
声明并实现 UDF。第一个参数必须是 byte[],您需要将字节数组转换为您的对象,如下所示:
private static UDF1<byte[], Long> extractSomeLong() {
return (byteArray) -> {
if (byteArray != null) {
ByteArrayInputStream in = new ByteArrayInputStream(byteArray);
ObjectInputStream is = new ObjectInputStream(in);
MyObject traceWritable = (MyObject) is.readObject();
return traceWritable.getSomeLong();
}
else {
return -1L;
}
};
}
最后,它可以用于:
Dataset<MyObject> data = sparkSession.createDataset(someRDD, Encoders.javaSerialization(MyObject.class));
Dataset<Row> processedData = data.withColumn( "ID", functions.callUDF( "extractSomeLong", new Column("columnName")))