提问者:小点点

Dataflow作业中的ModuleNotFoundError


我正在尝试在Google Cloud Platform中执行apache光束管道作为数据流作业。

我的项目结构如下:

root_dir/
  __init__.py
  ​setup.py
  ​main.py
  ​utils/
    __init__.py
    log_util.py
    config_util.py

这是我的setup.py

setuptools.setup(
   name='dataflow_example',
   version='1.0',
   install_requires=[
        "google-cloud-tasks==2.2.0",
        "google-cloud-pubsub>=0.1.0",
        "google-cloud-storage==1.39.0",
        "google-cloud-bigquery==2.6.2",
        "google-cloud-secret-manager==2.0.0",
        "google-api-python-client==2.3.0",
        "oauth2client==4.1.3",
        "apache-beam[gcp]>=2.20.0",
        "wheel>=0.36.2"
   ],
   packages=setuptools.find_packages()
)

这是我的管道代码:

import math
import apache_beam as beam

from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions

from utils.log_util import LogUtil
from utils.config_util import ConfigUtil


class DataflowExample:
    config = {}

    def __init__(self):
        self.config = ConfigUtil.get_config(module_config=["config"])
        self.project = self.config['project']
        self.region = self.config['location']
        self.bucket = self.config['core_bucket']
        self.batch_size = 10

    def execute_pipeline(self):
        try:
            LogUtil.log_n_notify(log_type="info", msg=f"Dataflow started")

            query = "SELECT id, name, company FROM `<bigquery_table>` LIMIT 10"

            beam_options = {
                "project": self.project,
                "region": self.region,
                "job_name": "dataflow_example",
                "runner": "DataflowRunner",
                "temp_location": f"gs://{self.bucket}/temp_location/"
            }

            options = PipelineOptions(**beam_options, save_main_session=True)

            with beam.Pipeline(options=options) as pipeline:
                data = (
                        pipeline
                        | 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
                        | 'Count records' >> beam.combiners.Count.Globally()
                        | 'Print ' >> beam.ParDo(PrintCount(), self.batch_size)
                )

            LogUtil.log_n_notify(log_type="info", msg=f"Dataflow completed")
        except Exception as e:
            LogUtil.log_n_notify(log_type="error", msg=f"Exception in execute_pipeline - {str(e)}")


class PrintCount(beam.DoFn):

    def __init__(self):
        self.logger = LogUtil()

    def process(self, row_count, batch_size):
        try:
            current_date = datetime.today().date()
            total = int(math.ceil(row_count / batch_size))

            self.logger.log_n_notify(log_type="info", msg=f"Records pulled from table on {current_date} is {row_count}")

            self.logger.log_n_notify(log_type="info", msg=f"Records per batch: {batch_size}. Total batches: {total}")
        except Exception as e:
            self.logger.log_n_notify(log_type="error", msg=f"Exception in PrintCount.process  - {str(e)}")


if __name__ == "__main__":
    df_example = DataflowExample()
    df_example.execute_pipeline()

管道的功能是

  1. 对BigQuery表进行查询。
  2. 计算从查询中获取的总记录。
  3. 使用utils文件夹中的自定义日志模块打印。

我正在使用命令-python3-main.py使用Cloud shell运行作业

尽管Dataflow作业启动,但工作节点在几分钟后抛出错误,说“ModuleNotFoundError: No module name'utils'”

“utils”文件夹可用,相同的代码在使用“DirectRunner”执行时可以正常工作。

log_utilconfig_util文件分别是用于日志记录和配置获取的自定义util文件。

另外,我尝试使用setup_file选项作为python3-main.py--setup_file

如何使用“DataflowRunner”解决ModuleNotFoundError?


共2个答案

匿名用户

发布为社区wiki。经@GopinathS确认,错误和修复如下:

工作人员遇到的错误是BeamSDK基本版本2.32.0与DataflowPython工作人员版本2.28.0不匹配。请检查Dataflow工作人员启动日志并确保安装了正确版本的BeamSDK。

要修复这个“apache-光束[gcp]

更新setup.py:

setuptools.setup(
   name='dataflow_example',
   version='1.0',
   install_requires=[
        "google-cloud-tasks==2.2.0",
        "google-cloud-pubsub>=0.1.0",
        "google-cloud-storage==1.39.0",
        "google-cloud-bigquery==2.6.2",
        "google-cloud-secret-manager==2.0.0",
        "google-api-python-client==2.3.0",
        "oauth2client==4.1.3", # removed apache-beam[gcp]>=2.20.0
        "wheel>=0.36.2"
   ],
   packages=setuptools.find_packages()
)

更新了流水线代码中的beam_options

    beam_options = {
        "project": self.project,
        "region": self.region,
        "job_name": "dataflow_example",
        "runner": "DataflowRunner",
        "temp_location": f"gs://{self.bucket}/temp_location/",
        "setup_file": "./setup.py"
    }

还要确保一次传递所有管道选项,而不是部分传递。

如果你通过--setup_file

为了避免解析参数并附加到beam_options,我直接在beam_options中添加了它作为"setup_file":"./setup.py"

匿名用户

数据流在安装平台锁定在隔离网络中的包时可能会遇到问题。如果没有网络,它将无法编译它们。或者它可能尝试安装它们,但因为无法编译下载轮子?不知道。

仍然要能够使用像心理2(二进制文件)或google-cloud d-secure-manager(没有二进制文件,但依赖项有二进制文件)这样的包,你需要安装所有没有二进制文件(无任何)和没有二进制文件依赖项的东西,通过需求. txt,其余的通过轮子的--extra_packages参数。示例:

--extra_packages=package_1_needed_by_2-manylinux.whl \
--extra_packages=package_2_needed_by_3-manylinux.whl \
--extra_packages=what-you-need_needing_3-none-any.whl