提问者:小点点

从同一个Cloud Function执行启动多个批处理数据流作业


我创建了一个自定义模板,它使用ReadFromBigQueryI/O连接器从BigQuery读取。我这样使用它:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--query',
            help='Query to retrieve from BigQuery acting as data source.')
        parser.add_argument(
            '--bucket',
            default='mybucketname',
            help='Bucket name for staging, temp and schema files.')

options = PipelineOptions()
args = options.view_as(CustomOptions)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'myproject'
google_cloud_options.region = 'europe-west1'
google_cloud_options.staging_location = 'gs://{}/staging/'.format(args.bucket)
google_cloud_options.temp_location = 'gs://{}/tmp/'.format(args.bucket)
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(SetupOptions).setup_file = './setup.py'


def run():
    with beam.Pipeline(options=options) as p:
        (
            p
            | "Read from BigQuery" >> beam.io.ReadFromBigQuery(
                                          query=args.query,
                                          use_standard_sql=True,
                                          flatten_results=False)
            ...
        )

setup.py

import setuptools

REQUIRED_PACKAGES = [
    'apache-beam',
    'apache-beam[gcp]',
    'google-cloud-storage'
]

setuptools.setup(
    name='ProcessEmailMetrics',
    version='0.0.1',
    description='Workflow to process email metrics.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True
)

最后,这是我如何使数据流API调用我的云函数:

import google.auth
from googleapiclient.discovery import build

credentials, _ = google.auth.default()
service = build('dataflow', 'v1b3', credentials=credentials, cache_discovery=False)

query = """
SELECT ...
"""

BODY = {
    'jobName': 'process-data',
    'gcsPath': 'gs://mybucket/templates/my_template',
    'parameters': {
      'query' : query
    }
}

req = service.projects().locations().templates().create(
    projectId='myproject',
    location='europe-west1',
    body=BODY
)

req.execute()

我通过在侦听Pub/Sub主题的Cloud Function上启动模板进行API调用来开始这项工作。如果我仅在该主题上发布一条消息,则管道将完成,没有任何错误。但是,如果我从同一个Cloud Function执行启动多个作业,则会收到两个不同的错误。

第一个是关于丢失的文件。前两个错误属于这种类型:

访问https://www.googleapis.com/storage/v1/b/my-bucket/o/tmp/6b2d2ba6-1/bigquery-table-dump-000000000003.json?alt=media

第二个是索引错误超出范围,同样是在读取ReadFromBigQuery上生成的avro文件时。接下来的三个错误属于这种类型:

2021-08-13 12:03:48.656来自worker的CESTError消息:Traceback(最近一次调用是最后一次):File"/usr/local/lib/python3.7/site-不包/dataflow_worker/native_operations",第651行,do_workwork_executor.执行()File"/usr/local/lib/python3.7/site-不包/dataflow_worker/dataflow_worker",第179行,在执行op.start()File"dataflow_worker/dataflow_worker",第38行,dataflow_worker.native_operations.NativeReadOper.start File"dataflow_worker/native_operations",第39行,dataflow_worker.native_operations.NativeReadOper.start File"dataflow_worker/apache_beam",第44行,dataflow_worker.native_operations.NativeReadOperation.start File"dataflow_worker/_source_bundles",第48行,dataflow_worker.native_operations.NativeReadOper.start File"/usr/local/lib/python3.7/site-不包/apache_beam/io/source_ix",第84行,在self中读取记录._source_bundles[source_ix

在这五个错误发生后,我的管道失败并停止了。

似乎ReadFromBigQuery连接器正在寻找一个包含一些实际上不存在或已被弄乱的BigQuery行的临时文件。

正如我所说的,如果我只启动一个数据流作业,它会毫无错误地完成,所以我有两个假设。

>

  • 可能与我的Cloud Function有关。当两条消息发布时间太接近时,函数没有时间进入睡眠状态,可能文件路径就这样乱了。

    • 创建build数据流服务时,cache_discovery=False选项会产生这个问题吗?

    也许,这是由于我的模板是如何编码的:

    • 可能选项。view_as(SetupOptions)。save_main_session=True选项是问题的关键吗?
    • 在读取/写入BigQuery时,我是否需要以某种方式为每个作业执行提供特定的时间数据集?
    • 上的不同时间位置google_cloud_options.temp_location='gs://{}/tmp/'. format(args.bucket)用于每个作业执行?

    我需要能够在同一个Cloud Function执行上启动多个Dataflow作业,因此实际行为不符合我的项目需求。

    这是我失败工作的ID之一:2021-08-13_02_54_10-11165491620802897150

    知道怎么解决吗?

    更新:

    版本

    python: 3.7.3 (on Cloud Shell)
    beam: 2.31.0 (on Cloud Shell)
    beam: undefined (on setup.py)
    

  • 共1个答案

    匿名用户

    我认为问题是两个管道都在执行Bigquery导出到同一个临时目录中,并且它们相互干扰。您可以为每个管道提供不同的目录,如下所示:

    您可以尝试为ReadFromBigQuery转换提供单独的GCS位置吗?您可以这样做:

    class CustomOptions(PipelineOptions):
        @classmethod
        def _add_argparse_args(cls, parser):
            ...
            parser.add_value_provider_argument(
                '--export_location',
                help='GCS location to perform Bigquery export')
            ...
    

    在您的管道中,您将单独传递此导出位置:

    def run():
        with beam.Pipeline(options=options) as p:
            (
                p
                | "Read from BigQuery" >> beam.io.ReadFromBigQuery(
                                              query=args.query,
                                              use_standard_sql=True,
                                              flatten_results=False,
                                              gcs_location=options.export_location)
                ...
            )
    

    最后,每次启动管道时都会自动生成一个新管道:

    BODY = {
        'jobName': 'process-data',
        'gcsPath': 'gs://mybucket/templates/my_template',
        'parameters': {
          'query' : query,
          'export_location': 'gs://mybucket/templates/my_template/tmp/' + str(uuid.uuid4())
        }
    }
    
    req = service.projects().locations().templates().create(
        projectId='myproject',
        location='europe-west1',
        body=BODY
    )
    
    req.execute()