我正在谷歌数据流上运行apache beam管道。它从GCS桶中读取数据,并在处理后写入GCS桶。这条管道处理日本的数据。在堆栈驱动程序日志中,日文字符显示正常。但是当我看到o/p桶中的数据时,它已损坏。所以我主要在想,要么在向GCS写入数据时,编码器没有设置,要么我们必须改变GCS文件格式。需要帮助来解决这个问题。
我已经尝试过在光束管道中设置编码。同样,在运行pipleine时,我尝试使用parma编码来运行jar
-Dfile.encoding=EUC-JP-jar目标/jarname--其他光束选项
Beam版本2.14.0
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleRunner {
private static final Logger LOG = LoggerFactory.getLogger(SimpleRunner.class);
static TupleTag<String> successRecord = new TupleTag<String>() {
private static final long serialVersionUID = 3L;
};
static class JapaneseCharProcessor extends DoFn<String, String> {
private static final long serialVersionUID = 1L;
public JapaneseCharProcessor() {
}
@ProcessElement
public void processElement(@Element String record, MultiOutputReceiver out, ProcessContext c) {
LOG.info("processElement {}", record);
c.output("processed Recors:" + record);
}
}
public static void main(String[] args) {
IExtractorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(IExtractorOptions.class);
final List<String> LINES = Arrays.asList(
"あらゆる情報を簡単に整理・投稿ができる!",
"ブログランキング",
"ブログを作る・楽しむ",
"ショッピングカート");
Pipeline pipeline = Pipeline.create(options);
LOG.info("options configured {}", options);
FileSystems.setDefaultPipelineOptions(options);
CoderRegistry cr = pipeline.getCoderRegistry();
cr.registerCoderForClass(String.class, StringUtf8Coder.of());
pipeline.apply("Read from input files",
Create.of(LINES)).setCoder(StringUtf8Coder.of())
.apply("Process input files", ParDo.of(new JapaneseCharProcessor()))
.apply("Writing output to success",
(TextIO.write().to(options.getSuccessRecordBucketURL()).withNumShards(10)));
pipeline.run().waitUntilFinish();
LOG.info("pipeLine execution completed");
}
}
实际结果 : 已处理的 Recors:ã'ã'‰ã'†ã'‹æƒ...å ±ã''ç°¡å ̃ã«æ•'ç†ãƒ»æŠ•ç ̈¿ãŒã§ã'‹ï1/4
预期结果:已处理记录:あらゆる情報を簡単に整理・投稿ができる!
我检查了您的代码,适用于DirectRunner
和DataflowRunner
和2.16.0 SDK:
$ gsutil cat gs://$BUCKET/japanese/output-cloud/result.txt-00000-of-00002
processed Recors:あらゆる情報を簡単に整理・投稿ができる!
processed Recors:ショッピングカート
但是,即使内容是正确的,如果我使用浏览器和GCS控制台检查文件,我也会看到奇怪的字符: