我在尝试使用spark-BigQuery连接器写入BigQuery时遇到此错误。应用程序是从hadoop集群(而不是dataproc)运行的。
java.io.IOException:从位于以下位置的元数据服务器获取访问令牌时出错:http://169 . 254 . 169 . 254/computeMetadata/v1/instance/service-accounts/default/token at com . Google . cloud . Hadoop . re packaged . GCS . com . Google . cloud . Hadoop . util . credential factory . getcredentialfrommetadataserviceaccount(credential factory . Java:236)at com . Google . cloud . Hadoop . GCS . com . Googlewith scope(rddoperationscope . Scala:151)at org . Apache . spark . SQL . execution . spark plan . execute query(spark plan . Scala:152)at org . Apache . spark . SQL . execution . spark plan . execute(spark plan . Scala:127)at org . Apache . spark . SQL . execution . query execution . tordd$lzy compute(query execution . Scala:80)at org . Apache . spark . SQL . execution .with new execution id(sqlexecution . Scala:77)at org . Apache . spark . SQL . data frame writer . run command(data frame writer . Scala:664)at org . Apache . spark . SQL . data frame writer . save 1 source(data frame writer . Scala:273)at org . Apache . spark . SQL . data frame writer . save(data frame writer . Scala:267)at org . Apache . spark . SQL . data frame writer . save(data frame writer .
这是代码,
dataset.write().format("bigquery")
.option("temporaryGcsBucket", tempGcsBucket)
//.option("table", databaseName + "." + tableName)
.option("project", projectId)
.option("parentProject", parentProjectId)
.option("credentials", credentials)
.mode(saveMode).save(projectId + "." + databaseName + "." + tableName);
我能够使用相同的凭据(服务帐户base 64编码)从我尝试写入的相同表中读取数据。我使用的是spark-big query-with-dependencies _ 2.11-0 . 19 . 1 . jar版本的连接器。
相同的代码在项目和父项目相同的较低环境中运行良好。但实际上,它们是不同的。
我已经通过修改这个问题的代码解决了这个问题,首先你需要配置你的spark会话
spark._jsc.hadoopConfiguration().set("fs.gs.project.id", <project_id>)
spark._jsc.hadoopConfiguration().set("fs.gs.system.bucket", <bucket>)
spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")
spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.email", <client_email>)
spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.private.key.id", <private_key_id>)
spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.private.key", <private_key>)
然后您需要将密钥JSON转换为base 64。我使用了这个简单的助手
def to_base64(creds):
creds_bytes = creds.encode('ascii')
base64_bytes = base64.b64encode(creds_bytes)
return base64_bytes.decode('ascii')
然后写入数据
(dataFrame
.write
.format("bigquery")
.mode("append")
.option("credentials", to_base64(<creds>))
.option("parentProject", <project_id>)
.option("temporaryGcsBucket", <bucket>)
.option("table", <table_name>)
.save()
)
我不太确定您是否需要所有这些hadoopConfiguration选项,但至少它对我有用。