提问者:小点点

Spark BigQuery连接器从元数据服务器获取访问令牌时出错


我在尝试使用spark-BigQuery连接器写入BigQuery时遇到此错误。应用程序是从hadoop集群(而不是dataproc)运行的。

java.io.IOException:从位于以下位置的元数据服务器获取访问令牌时出错:http://169 . 254 . 169 . 254/computeMetadata/v1/instance/service-accounts/default/token at com . Google . cloud . Hadoop . re packaged . GCS . com . Google . cloud . Hadoop . util . credential factory . getcredentialfrommetadataserviceaccount(credential factory . Java:236)at com . Google . cloud . Hadoop . GCS . com . Googlewith scope(rddoperationscope . Scala:151)at org . Apache . spark . SQL . execution . spark plan . execute query(spark plan . Scala:152)at org . Apache . spark . SQL . execution . spark plan . execute(spark plan . Scala:127)at org . Apache . spark . SQL . execution . query execution . tordd$lzy compute(query execution . Scala:80)at org . Apache . spark . SQL . execution .with new execution id(sqlexecution . Scala:77)at org . Apache . spark . SQL . data frame writer . run command(data frame writer . Scala:664)at org . Apache . spark . SQL . data frame writer . save 1 source(data frame writer . Scala:273)at org . Apache . spark . SQL . data frame writer . save(data frame writer . Scala:267)at org . Apache . spark . SQL . data frame writer . save(data frame writer .

这是代码,

    dataset.write().format("bigquery")
            .option("temporaryGcsBucket", tempGcsBucket)
            //.option("table", databaseName + "." + tableName)
            .option("project", projectId)
            .option("parentProject", parentProjectId)
            .option("credentials", credentials)
            .mode(saveMode).save(projectId + "." + databaseName + "." + tableName);

我能够使用相同的凭据(服务帐户base 64编码)从我尝试写入的相同表中读取数据。我使用的是spark-big query-with-dependencies _ 2.11-0 . 19 . 1 . jar版本的连接器。

相同的代码在项目和父项目相同的较低环境中运行良好。但实际上,它们是不同的。


共1个答案

匿名用户

我已经通过修改这个问题的代码解决了这个问题,首先你需要配置你的spark会话

spark._jsc.hadoopConfiguration().set("fs.gs.project.id", <project_id>)
spark._jsc.hadoopConfiguration().set("fs.gs.system.bucket", <bucket>)
spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")
spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.email", <client_email>)
spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.private.key.id", <private_key_id>)
spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.private.key", <private_key>)

然后您需要将密钥JSON转换为base 64。我使用了这个简单的助手

def to_base64(creds):
    creds_bytes = creds.encode('ascii')
    base64_bytes = base64.b64encode(creds_bytes)
    return base64_bytes.decode('ascii')

然后写入数据

(dataFrame
 .write
 .format("bigquery")
 .mode("append")
 .option("credentials", to_base64(<creds>))
 .option("parentProject", <project_id>)
 .option("temporaryGcsBucket", <bucket>)
 .option("table", <table_name>)
 .save()
)

我不太确定您是否需要所有这些hadoopConfiguration选项,但至少它对我有用。