我是数据流新手,我将把以下代码片段从Java SDK 1.9.0迁移到2.3.0:
//SDK 1.9.0
PCollection<MyType> pubsub = p.apply(
PubsubIO.Read.named("Read from Pubsub")
.topic(myTopic)
.withCoder(SerializableCoder.of(MyType.class))
.timestampLabel("myDate"));
我会把它转换成
//SDK 2.3.0
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
PubsubIO.<MyType>read () // <-- COMPILE ERROR here, private method
.fromTopic(myTopic)
.withTimestampAttribute ("myDate"))
.setCoder(SerializableCoder.of(MyType.class));
但从java SDK 2.3.0开始,PubsubIO.read()
方法是私有的。
因此,我需要使用带有<code>MyType</code>的序列化实例的消息,但<code>PubsubIO</code〕公开的方法似乎只适用于文本消息、avro、protobuf等。
如何从包含序列化java对象的消息中读取PubsubIO主题?
更新:
我可以这样调整它(尚未尝试)…
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
PubsubIO.readMessagesWithAttributes ()
.fromTopic(myTopic)
.withTimestampAttribute ("myDate"))
.apply (MapElements.via(new SimpleFunction<PubsubMessage, MyType> () {
@Override
public MyType apply (final PubsubMessage message) {
final byte[] payload = message.getPayload ();
try {
try (final ObjectInputStream stream = new ObjectInputStream (new ByteArrayInputStream (payload))) {
return (MyType) stream.readObject ();
}
} catch (IOException e) {
throw new RuntimeException (e);
} catch (ClassNotFoundException e) {
throw new RuntimeException (e);
}
}
}))
更新后的代码看起来应该可以工作了。请注意,如果您不使用属性映射,还有PubsubIO.readPubsubMessagesWithotAt
PR#2634中删除了之前的功能,取而代之的是最常见编码类型(proto、avro、Strings)的专用方法。
我怀疑通过< code>SerializableCoder的任意对象解码没有被保留,因为依赖Java序列化存在固有的危险。参见< code > serializable coder javadoc或相关问题Java序列化-优缺点,使用还是避免?。但是,如果您觉得缺少API,Beam SDK是开源的,社区欢迎贡献。