提问者:小点点

Google DataFlow-从BigQuery自动完成示例阅读


我开始学习Dataflow,我正在使用此代码示例自动完成。我正在尝试从BigQuery读取,但我收到此错误:

ERROR:root:Error while visiting split
...
File "/usr/lib/python2.7/re.py", line 181, in findall
return _compile(pattern, flags).findall(string)
TypeError: expected string or buffer [while running 'split']

代码:

def run(argv=None):

parser = argparse.ArgumentParser()
parser.add_argument('--output',
                  required=True,
                  help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)

(p  # pylint: disable=expression-not-assigned
 | 'read' >> beam.io.Read(beam.io.BigQuerySource(input_table))
 | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
 | 'TopPerPrefix' >> TopPerPrefix(5)
 | 'format' >> beam.Map(
   lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
 | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()

谢谢你的反馈汤姆


共1个答案

匿名用户

默认情况下,BigQuerySource返回结构化记录,即从列到输入表中的值的映射。这意味着您不能直接对记录运行re. findall。

相反,提取您关心的特定字段,即,

 | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x['my_string_field']))