提问者:小点点

转换一个大的jsonl文件与未知的json属性到csv使用apache光束谷歌数据流和java


如何使用Apache Beam,谷歌数据流和java将具有未知json属性的大型jsonl文件转换为csv

这是我的场景:

  1. 一个大的jsonl文件在谷歌存储中
  2. Json属性未知,因此无法在Beam的管道中定义使用Apache Beam的Schema。
  3. 使用Apache光束、google数据流和java将jsonl转换为csv
  4. 转换完成后,将csv存储在google存储中(存储jsonl的同一个存储桶)
  5. 通过某种方式通知,例如transformation_done=true如果可能(rest api或事件)

任何帮助或指导都会有所帮助,因为我是Apache Beam的新手,尽管我正在阅读Apache Beam的文档。

我已经用示例JSONL数据编辑了问题

{"Name":"Gilbert", "Session":"2013", "Score":"24", "Completed":"true"}
{"Name":"Alexa", "Session":"2013", "Score":"29", "Completed":"true"}
{"Name":"May", "Session":"2012B", "Score":"14", "Completed":"false"}
{"Name":"Deloise", "Session":"2012A", "Score":"19", "Completed":"true"} 

虽然json key存在于输入文件中,但在转换时它是未知的。我将通过一个例子来解释这一点,假设我有三个客户端,每个客户端都有自己的google存储空间,因此每个客户端都上传了具有不同json属性的自己的jsonl文件。

客户端1:输入Jsonl文件

{"city":"Mumbai", "pincode":"2012A"} 
{"city":"Delhi", "pincode":"2012N"} 

客户端2:输入Jsonl文件

{"Relation":"Finance", "Code":"2012A"} 
{"Relation":"Production", "Code":"20XXX"} 

客户端3:输入Jsonl文件

{"Name":"Gilbert", "Session":"2013", "Score":"24", "Completed":"true"}
{"Name":"Alexa", "Session":"2013", "Score":"29", "Completed":"true"}

问题:我如何编写一个通用的光束管道,它可以将所有三个转换为如下所示

客户端1:输出CSV文件

["city", "pincode"] 
["Mumbai","2012A"] 
["Delhi", "2012N"] 

客户端2:输出CSV文件

["Relation", "Code"] 
["Finance", "2012A"] 
["Production","20XXX"] 

客户端3:输出CSV文件

["Name", "Session", "Score", "true"]
["Gilbert", "2013", "24", "true"]
["Alexa", "2013", "29", "true"]

共1个答案

匿名用户

编辑:删除了之前的问题,因为问题已通过示例进行了修改。

任何人都没有提供通用的方法来实现这样的结果。您必须根据您的需求和处理管道的方式自己编写逻辑。

下面有一些示例,但您需要针对您的情况验证这些示例,因为我只在一个小的JSONL文件上尝试过这些示例。

方法1
如果您可以收集输出csv的标头值,那么它会容易得多。但是事先获取标头本身又是一个挑战。

//pipeline
pipeline.apply("ReadJSONLines",
                TextIO.read().from("FILE URL"))
                .apply(ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processLines(@Element String line, OutputReceiver<String> receiver) {                        
                        String values = getCsvLine(line, false);
                        receiver.output(values);

                    }
                }))
            .apply("WriteCSV",
                    TextIO.write().to("FileName")
                            .withSuffix(".csv")
                            .withoutSharding()
                            .withDelimiter(new char[] { '\r', '\n' })
                            .withHeader(getHeader()));
 private static String getHeader() {
        String header = "";
        //your logic to get the header line.
        return header;
    }

获取标题行的可能方法(仅假设在您的情况下可能不起作用):

  • 您可以在GCS中有一个文本文件,它将存储特定JSON文件的标题。在您的逻辑中,您可以通过读取文件来获取标头,查看SO线程了解如何从GCS读取文件
  • 您可以尝试将标头作为运行时参数传递,但这取决于您如何配置和执行管道。

方法2
这是我为小JsonFiles(~10k行)找到的解决方法。下面的示例可能不适用于大文件。

final int[] count = { 0 };
pipeline.apply(//read file)
                .apply(ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processLines(@Element String line, OutputReceiver<String> receiver) {

                        // check if its the first processing element. If yes then create the header
                        if (count[0] == 0) {
                            String header = getCsvLine(line, true);
                            receiver.output(header);
                            count[0]++;
                        }
                        String values = getCsvLine(line, false);
                        receiver.output(values);

                    }
                }))
            .apply(//write file)

正如Saransh在使用FileIO的评论中提到的,您所要做的就是手动逐行读取JSONL,然后将其转换为逗号分隔的format.EG:

pipeline.apply(FileIO.match().filepattern("FILE PATH"))
        .apply(FileIO.readMatches())
        .apply(FlatMapElements
                .into(TypeDescriptors.strings())
                .via((FileIO.ReadableFile f) -> {

                    List<String> output = new ArrayList<>();
                    try (BufferedReader br = new BufferedReader(Channels.newReader(f.open(), "UTF-8"))) {
                        String line = br.readLine();
                        while (line != null) {
                            
                            if (output.size() == 0) {
                                String header = getCsvLine(line, true);
                                output.add(header);
                            }
                            String result = getCsvLine(line, false);
                            output.add(result);
                            line = br.readLine();
                        }
                    } catch (IOException e) {
                        throw new RuntimeException("Error while reading", e);
                    }
                    return output;
                }))
            .apply(//write to gcs)

在上面的例子中,我使用了一个getCsvLine方法(为代码可用性而创建),它从文件中获取一行并将其转换为逗号分隔的format.To解析我使用GSON的JSON对象。

/**
 * @param line     take each JSONL line
 * @param isHeader true : Returns output combining the JSON keys || false:
 *                 Returns output combining the JSON values
 **/
public static String getCsvLine(String line, boolean isHeader) {
    List<String> values = new ArrayList<>();
    // convert the line into jsonobject
    JsonObject jsonObject = JsonParser.parseString(line).getAsJsonObject();
    // iterate json object and collect all values
    for (Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
        if (isHeader)
            values.add(entry.getKey());
        else
            values.add(entry.getValue().getAsString());
    }
    String result = String.join(",", values);
    return result;
}