我正在使用Java将JSON转换为Avro,并使用Google DataFlow将其存储到GCS。Avro模式是使用SchemaBuilder在运行时创建的。
我在模式中定义的字段之一是可选的LONG字段,它是这样定义的:
SchemaBuilder.FieldAssembler<Schema> fields = SchemaBuilder.record(mainName).fields();
Schema concreteType = SchemaBuilder.nullable().longType();
fields.name("key1").type(concreteType).noDefault();
现在,当我使用上面的模式创建GenericRecord时,并且“key1”未设置,当将结果GenericRecord放在我的DoFn的上下文中时:context.output(res);
我得到以下错误:
异常在线程"main"org.apache.beam.sdk.Pipeline$PipelineExecutionException:org.apache.avro.未解析的UnionException: not in Union["long","null"]: 256
我还尝试使用with Default(0L)
做同样的事情,得到了相同的结果。
我错过了什么?谢谢
当我尝试如下操作时,它对我很好,您可以尝试打印有助于比较的模式,也可以删除null()以供长类型尝试。
fields.name("key1").type().nullable().longType().longDefault(0);
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.avro.SchemaBuilder.RecordBuilder;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import java.io.File;
import java.io.IOException;
public class GenericRecordExample {
public static void main(String[] args) {
FieldAssembler<Schema> fields;
RecordBuilder<Schema> record = SchemaBuilder.record("Customer");
fields = record.namespace("com.example").fields();
fields = fields.name("first_name").type().nullable().stringType().noDefault();
fields = fields.name("last_name").type().nullable().stringType().noDefault();
fields = fields.name("account_number").type().nullable().longType().longDefault(0);
Schema schema = fields.endRecord();
System.out.println(schema.toString());
// we build our first customer
GenericRecordBuilder customerBuilder = new GenericRecordBuilder(schema);
customerBuilder.set("first_name", "John");
customerBuilder.set("last_name", "Doe");
customerBuilder.set("account_number", 999333444111L);
Record myCustomer = customerBuilder.build();
System.out.println(myCustomer);
// writing to a file
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(myCustomer.getSchema(), new File("customer-generic.avro"));
dataFileWriter.append(myCustomer);
System.out.println("Written customer-generic.avro");
} catch (IOException e) {
System.out.println("Couldn't write file");
e.printStackTrace();
}
// reading from a file
final File file = new File("customer-generic.avro");
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
GenericRecord customerRead;
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader)){
customerRead = dataFileReader.next();
System.out.println("Successfully read avro file");
System.out.println(customerRead.toString());
// get the data from the generic record
System.out.println("First name: " + customerRead.get("first_name"));
// read a non existent field
System.out.println("Non existent field: " + customerRead.get("not_here"));
}
catch(IOException e) {
e.printStackTrace();
}
}
}
如果我正确理解您的问题,您正在尝试接受JSON字符串并将它们保存在Cloud Storage存储桶中,在数据通过数据流时使用Avro作为数据的编码器。在我看来,您的代码没有任何明显的错误。我已经这样做了,包括将数据保存到Cloud Storage和BigQuery。
您可以考虑使用一种更简单且可能更不容易出错的方法:为您的数据定义一个Java类,并在其上使用Avro注释以使编码器能够正常工作。这是一个例子:
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
@DefaultCoder(AvroCoder.class)
public class Data {
public long nonNullableValue;
@Nullable public long nullableValue;
}
然后,在您的DnFn
实现中使用此类型,就像您可能已经使用的那样。Beam应该能够使用Avro在工作人员之间正确移动数据,即使标记为@Nullable
的字段为空。