提问者:小点点

使用数据流将文件名和时间戳添加到BigQuery中的每条记录中


我有几个. txt文件中的数据JSON加载到谷歌BigQuery表。随着文本文件中的列,我需要为每一行插入文件名和当前时间戳。它在GCP数据流中,Python3.7

我使用GCSFileSystem. match和metadata_list访问了包含文件路径和大小的文件数据。

我相信我需要让管道代码在循环中运行,将文件路径传递给ReadFromText,并调用FileNameReadFunction ParDo。

   (p
        | "read from file" >> ReadFromText(known_args.input)
        | "parse" >> beam.Map(json.loads)
        | "Add FileName" >>  beam.ParDo(AddFilenamesFn(), GCSFilePath)
        | "WriteToBigQuery" >> beam.io.WriteToBigQuery(known_args.output,          
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
     )

我按照Dataflow/apache光束中的步骤-在传入模式时如何访问当前文件名?但我不能让它完全工作。

感谢任何帮助。


共1个答案

匿名用户

您可以使用textio. ReadFromTextWellFilename而不是ReadFromText。这将生成(filename,line)元组的PCollection。

要在输出json记录中包含文件和时间戳,您可以将“解析”行更改为

| "parse" >> beam.map(lambda (file, line): {
    **json.loads(line),
    "filename": file,
    "timestamp": datetime.now()})