提问者:小点点

Apache Beam Pipeline从REST读取API并写入Google Big Query


我一直在尝试使用Dataflow上的经典模板运行我的管道。

管道应该读取RESTAPI。然后,从API返回的答案应该写入一个bigquery表。

当我将api限制为1个结果时,它运行成功,但是,如果我将限制增加到1个以上,它会返回错误

非常感谢帮助-这是我的管道代码:

#!/usr/bin/env python3

import apache_beam as beam
#from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery
import setuptools

import logging

# Handling of API calls
import json
import requests

class get_api_data(beam.DoFn):
    def __init__(self):
        logging.debug("fetching api data")

    def process(self,dummy_start):

        api_url = "https://api.thedogapi.com/v1/breeds/?limit=05"

        logging.debug("Now fetching from ", api_url)

        response = requests.get(api_url)

        return list(response.json())



def run(argv=None):

    beam_options = PipelineOptions(
        runner='direct', #'DataflowRunner',
        region='us-west2',
        project='projectid',
        job_name='api-to-bq-test1', # Dataflow job name
        temp_location='gs://rb-munish-playground/temp',
    )

    # BigQuery Table details
    table_spec = bigquery.TableReference(
    projectId='projectid',
    datasetId='dataflow_test',
    tableId='dogs')

    table_schema = {
    'fields': [
    {
        'name': 'weight', 'type': 'RECORD', 'mode': 'REPEATED',
            'fields': [{'name':'imperial','type':'STRING','mode':'NULLABLE'},
                        {'name':'metric','type':'STRING','mode':'NULLABLE'}]
    },
    {
        'name': 'height', 'type': 'RECORD', 'mode': 'REPEATED',
            'fields': [{'name':'imperial','type':'STRING','mode':'NULLABLE'},
                        {'name':'metric','type':'STRING','mode':'NULLABLE'}]
    },
    {
        'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'
    },
    {
        'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'
    },
    {
        'name': 'bred_for', 'type': 'STRING', 'mode': 'NULLABLE'
    },
    {
        'name': 'breed_group', 'type': 'STRING', 'mode': 'NULLABLE'
    },
    {
        'name': 'life_span', 'type': 'STRING', 'mode': 'NULLABLE'
    },
    {
        'name': 'temperament', 'type': 'STRING', 'mode': 'NULLABLE'
    },
    {
        'name': 'origin', 'type': 'STRING', 'mode': 'NULLABLE'
    },
    {
        'name': 'reference_image_id', 'type': 'STRING', 'mode': 'NULLABLE'
    },
    {
        'name': 'image', 'type': 'RECORD', 'mode': 'REPEATED',
            'fields': [{'name':'height','type':'INTEGER','mode':'NULLABLE'},
                        {'name':'id','type':'STRING','mode':'NULLABLE'},
                        {'name':'url','type':'STRING','mode':'NULLABLE'},
                        {'name':'width','type':'INTEGER','mode':'NULLABLE'}]
    },
    ]
}

    # BigQueryDisposition.CREATE_IF_NEEDED:
    # Specifies that the write operation should create a new table if one does not exist.
    # If you use this value, you must provide a table schema.
    # CREATE_IF_NEEDED is the default behavior.

    # BigQueryDisposition.CREATE_NEVER:
    # Specifies that a table should never be created.
    # If the destination table does not exist, the write operation fails.

    # BigQueryDisposition.WRITE_EMPTY:
    # Specifies that the write operation should fail at runtime if the destination table is not empty.
    # WRITE_EMPTY is the default behavior.

    # BigQueryDisposition.WRITE_TRUNCATE:
    # Specifies that the write operation should replace an existing table.
    # Any existing rows in the destination table are removed, and the new rows are added to the table.

    # BigQueryDisposition.WRITE_APPEND:
    # Specifies that the write operation should append the rows to the end of the existing table.

    #p1 = beam.Pipeline(options=beam_options)
    with beam.Pipeline(options=beam_options) as pipeline:

        ingest_data = (
            pipeline
            | 'Create' >> beam.Create(['Start'])  # workaround to kickstart the pipeline
            | 'fetch API data' >> beam.ParDo(get_api_data())
            | 'write into gbq' >> beam.io.WriteToBigQuery(table = table_spec, schema=table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
            #| 'write to text' >> beam.io.WriteToText("./results.txt")
        )

        result = pipeline.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

这里是错误堆栈

  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 742, in process
    self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 627, in wait_for_bq_job
    raise RuntimeError(
RuntimeError: BigQuery job beam_bq_job_LOAD_apitobqtest1_LOAD_STEP_465_21cc6601da786b6d152c6a0338a36bf7_1d78c9d4d4674091a7952fd88df97035 failed. Error Result: <ErrorProto
 location: 'gs://rb-munish-playground/temp/bq_load/843e8ffd21e346d5b754d3d15dff14da/rb-munish-playground.dataflow_test.dogs/35808f33-0892-4fe6-8cb0-5f90d7d11a89'
 message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 2; errors: 1. Please look into the errors[] collection for more details.'
 reason: 'invalid'> [while running 'write into gbq/BigQueryBatchFileLoads/WaitForDestinationLoadJobs']

共2个答案

匿名用户

查看您在管道上指定的Cloud Storage文件夹Optiontemp_location或在beam.io中指定参数custom_gcs_temp_location。您应该找到加载到BigQuery中的文件。

另一方面,您应该使用gcloud并使用bq show-j检查您的BigQuery作业

告诉您哪个字段导致了错误。在我的例子中,我的模式缺少liutravail字段。

匿名用户

一般来说,不要在ParDo中发出API请求。Beam并行化该ParDo中的工作项;管道可以扇出1000个或更多并发请求,本质上成为JMeter测试。API可能无法处理负载。

您的示例显示了一个要发出请求的元素,这是安全的。尽管很可能您不需要Beam来处理这么少的数据。