提问者:小点点

com.microsoft.azure. eventhubs.EventHubException错误,同时读取事件从EventHub数据库


使用结构化流pyspark中记录的步骤,我能够创建连接但无法读取数据。下面是获取的错误

错误:

Stream stopped...
java.util.concurrent.ExecutionException: com.microsoft.azure.eventhubs.EventHubException
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.spark.eventhubs.client.EventHubsClient.liftedTree1$1(EventHubsClient.scala:187)
    at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal$lzycompute(EventHubsClient.scala:184)
    at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal(EventHubsClient.scala:183)
    at org.apache.spark.eventhubs.client.EventHubsClient.partitionCount(EventHubsClient.scala:176)
    at org.apache.spark.sql.eventhubs.EventHubsSource.partitionCount(EventHubsSource.scala:81)
    at org.apache.spark.sql.eventhubs.EventHubsSource.$anonfun$maxOffsetsPerTrigger$4(EventHubsSource.scala:96)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.eventhubs.EventHubsSource.$anonfun$maxOffsetsPerTrigger$2(EventHubsSource.scala:96)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.eventhubs.EventHubsSource.<init>(EventHubsSource.scala:96)
    at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:326)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:100)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:97)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:484)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:484)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:489)
    at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren(LogicalPlan.scala:197)
    at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren$(LogicalPlan.scala:196)
    at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:224)
    at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:224)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:489)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:460)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:428)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:95)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:165)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:165)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:349)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
Caused by: com.microsoft.azure.eventhubs.EventHubException
    at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:74)
    at com.microsoft.azure.eventhubs.impl.ReceiveLinkHandler.onLinkFinal(ReceiveLinkHandler.java:81)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:182)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
    at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

代码:

from pyspark.sql.types import *
import  pyspark.sql.functions as F

# Event Hub Namespace Name
NAMESPACE_NAME = "<>"
KEY_NAME = "<>"
KEY_VALUE = "<>"

# The connection string to your Event Hubs Namespace
connectionString = "Endpoint=sb://{0}.servicebus.windows.net/;SharedAccessKeyName={1};SharedAccessKey={2};EntityPath=testing".format(NAMESPACE_NAME, KEY_NAME, KEY_VALUE)

ehConf = {}
# ehConf['eventhubs.connectionString'] = connectionString

# For 2.3.15 version and above, the configuration dictionary requires that connection string be encrypted.
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

read_df  = spark.readStream.format("eventhubs").options(**ehConf).load()

# defining the schema
read_schema = StructType([
  StructField("Color", StringType(), True),
  StructField("Value", StringType(), True)]
)
#capturating in dataframe
decoded_df = read_df.select(F.from_json(F.col("body").cast("string"), read_schema).alias("payload"))

#To view the stream

query1=decoded_df.writeStream.format("json").option("path","/user/sh/JsonStream/").option("checkpointLocation","/user/sh/EventHubCheckPoint/").queryName("read_hub").start()

请帮帮我,哪里出了问题

注意:我已经添加了com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.20,并使用DBR 8.4和火花火花3.1.2和scala2.12


共1个答案

匿名用户

这很可能是由不正确的依赖项引起的-对于DBR 7. x