提问者:小点点

DataFlow SDK 2. x:如何使用java序列化从PubSubIO使用


我是数据流新手,我将把以下代码片段从Java SDK 1.9.0迁移到2.3.0:

//SDK 1.9.0
PCollection<MyType> pubsub = p.apply(
  PubsubIO.Read.named("Read from Pubsub")
  .topic(myTopic)
  .withCoder(SerializableCoder.of(MyType.class))
  .timestampLabel("myDate"));

我会把它转换成

//SDK 2.3.0
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
  PubsubIO.<MyType>read () // <-- COMPILE ERROR here, private method
  .fromTopic(myTopic)
  .withTimestampAttribute ("myDate"))
.setCoder(SerializableCoder.of(MyType.class));

但从java SDK 2.3.0开始,PubsubIO.read()方法是私有的。

因此,我需要使用带有<code>MyType</code>的序列化实例的消息,但<code>PubsubIO</code〕公开的方法似乎只适用于文本消息、avro、protobuf等。

如何从包含序列化java对象的消息中读取PubsubIO主题?

更新:

我可以这样调整它(尚未尝试)…

PCollection<MyType> pubsub = p.apply("Read from Pubsub",
  PubsubIO.readMessagesWithAttributes ()
  .fromTopic(myTopic)
  .withTimestampAttribute ("myDate"))
.apply (MapElements.via(new SimpleFunction<PubsubMessage, MyType> () {
        @Override
        public MyType apply (final PubsubMessage message) {
            final byte[] payload = message.getPayload ();
            try {
                try (final ObjectInputStream stream = new ObjectInputStream (new ByteArrayInputStream (payload))) {
                    return (MyType) stream.readObject ();
                }
            } catch (IOException e) {
                throw new RuntimeException (e);
            } catch (ClassNotFoundException e) {
                throw new RuntimeException (e);
            }
        }
    }))

共1个答案

匿名用户

更新后的代码看起来应该可以工作了。请注意,如果您不使用属性映射,还有PubsubIO.readPubsubMessagesWithotAt

PR#2634中删除了之前的功能,取而代之的是最常见编码类型(proto、avro、Strings)的专用方法。

我怀疑通过< code>SerializableCoder的任意对象解码没有被保留,因为依赖Java序列化存在固有的危险。参见< code > serializable coder javadoc或相关问题Java序列化-优缺点,使用还是避免?。但是,如果您觉得缺少API,Beam SDK是开源的,社区欢迎贡献。