提问者:小点点

如何使用Database ricks集群(Scala)将数据从Eventhub摄取到ADLS


我想以指定的格式从Eventhub摄取流数据到ADLS gen2。

我做了批量数据摄取,从DB到ADLS和容器到容器,但现在我想尝试流式数据摄取。

你能指导我从哪里开始继续下一步吗?我确实在Azure中创建了Eventhub、Database rick实例和存储帐户。


共1个答案

匿名用户

您只需要遵循EventHubs Spark连接器的留档(对于Scala,对于Python)。代码最简单的方式如下(对于Python):

readConnectionString = "..."
ehConf = {}
# this is required for versions 2.3.15+
ehConf['eventhubs.connectionString']=sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(readConnectionString)

df = spark.readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

# casting binary payload to String (but it's really depends on the 
# data format inside the topic)
cdf = df.withColumn("body", F.col("body").cast("string"))

# write data to storage
stream = cdf.writeStream.format("delta")\
  .outputMode("append")\
  .option("checkpointLocation", "/path/to/checkpoint/directory")\
  .start("ADLS location")

您可能需要添加其他选项,例如起始位置等,但所有内容都在留档中描述得很好。