提问者:小点点

当在谷歌数据流上运行apache beam时,日语字符被损坏


我正在谷歌数据流上运行apache beam管道。它从GCS桶中读取数据,并在处理后写入GCS桶。这条管道处理日本的数据。在堆栈驱动程序日志中,日文字符显示正常。但是当我看到o/p桶中的数据时,它已损坏。所以我主要在想,要么在向GCS写入数据时,编码器没有设置,要么我们必须改变GCS文件格式。需要帮助来解决这个问题。

我已经尝试过在光束管道中设置编码。同样,在运行pipleine时,我尝试使用parma编码来运行jar

-Dfile.encoding=EUC-JP-jar目标/jarname--其他光束选项

Beam版本2.14.0


import java.util.Arrays;
import java.util.List;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



public class SimpleRunner {

    private static final Logger LOG = LoggerFactory.getLogger(SimpleRunner.class);

    static TupleTag<String> successRecord = new TupleTag<String>() {

        private static final long serialVersionUID = 3L;
    };


    static class JapaneseCharProcessor extends DoFn<String, String> {
        private static final long serialVersionUID = 1L;

        public JapaneseCharProcessor() {
        }

        @ProcessElement
        public void processElement(@Element String record, MultiOutputReceiver out, ProcessContext c) {
            LOG.info("processElement  {}", record);
            c.output("processed Recors:" + record);

        }

    }


    public static void main(String[] args)  {

        IExtractorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(IExtractorOptions.class);
        final List<String> LINES = Arrays.asList(
                  "あらゆる情報を簡単に整理・投稿ができる!",
                  "ブログランキング",
                  "ブログを作る・楽しむ",
                  "ショッピングカート");
        Pipeline pipeline = Pipeline.create(options);

        LOG.info("options configured  {}", options);
        FileSystems.setDefaultPipelineOptions(options);

        CoderRegistry cr = pipeline.getCoderRegistry();
        cr.registerCoderForClass(String.class, StringUtf8Coder.of());

        pipeline.apply("Read from input files",
                Create.of(LINES)).setCoder(StringUtf8Coder.of())
                .apply("Process input files", ParDo.of(new JapaneseCharProcessor()))
                .apply("Writing output to success",
                        (TextIO.write().to(options.getSuccessRecordBucketURL()).withNumShards(10)));
        pipeline.run().waitUntilFinish();
        LOG.info("pipeLine execution completed");

    }

}

实际结果 : 已处理的 Recors:ã'ã'‰ã'†ã'‹æƒ...å ±ã''ç°¡å ̃ã«æ•'ç†ãƒ»æŠ•ç ̈¿ãŒã§ã'‹ï1/4

预期结果:已处理记录:あらゆる情報を簡単に整理・投稿ができる!


共1个答案

匿名用户

我检查了您的代码,适用于DirectRunnerDataflowRunner和2.16.0 SDK:

$ gsutil cat gs://$BUCKET/japanese/output-cloud/result.txt-00000-of-00002
processed Recors:あらゆる情報を簡単に整理・投稿ができる!
processed Recors:ショッピングカート

但是,即使内容是正确的,如果我使用浏览器和GCS控制台检查文件,我也会看到奇怪的字符: