提问者:小点点

Spark:单元测试 - 我有一个函数可以联合 3 个输入数据集。我应该对它们进行单元测试吗?


我已经写了一个代码的一部分如下

Object Cal{
def mergedatasets(df: Dataset[Row], df1: Dataset[Row],df2: Dataset[Row]):Dataset[Row]={
 df.union(df1).union(df2)
//other logic

}

}
object readDataframes{
def readFirstDF(spark:SparkSession):Dataset[Row]={
 spark.read.json(somefile)
}
def readSecondDF(spark:SparkSession):Dataset[Row]={
 spark.read.json(somefile)
}
def readThirdDF(spark:SparkSession):Dataset[Row]={
 spark.read.json(somefile)
}
}

在上面的代码中,我正在读取3个文件,然后将它们合并为一个文件,以便进一步处理。基于上述场景,我的问题如下:

    < li >对mergdatasets函数进行单元测试有意义吗?如果是,需要测试的基本/最基本的东西是什么?如何检查拐角情况,如果有的话? < li >对readDataframes进行单元测试有意义吗?如果是,测试什么?是检查推断的模式是否如预期的那样吗?还有别的吗?

我也想为以下功能扩展上述问题

def timeIntervalAgg(df: Dataset[Row]): Dataset[Row] = {

    val timeInterval = df
      .groupBy("id","page_number")
      .agg(sum("timeInterval").alias("timeInterval"))
    timeIntervalAgg

  }

  def timeInterval(df: Dataset[Row]): Dataset[Row] ={

    val windowSpec = Window.partitionBy("id").orderBy("date_time")
    val timeFmt = "yyyy-MM-dd'T'HH:mm:ss"
    val endTime = lead(col("date_time"),1).over(windowSpec)
    val startTime = col("date_time")
    val timeDiff = (unix_timestamp(endTime, timeFmt)
      - unix_timestamp(startTime, timeFmt))
    val timeInterval = df
      .withColumn("timeInterval", lit(when(col("event") === "this_event",lit(null)
        .cast("long"))
        .otherwise(timeDiff)))
      .where("""event != "this_event" """)
    timeInterval

  }

  def addOddpages(df: Dataset[Row]) :Dataset[Row] = {

    val odd = df
      .where("""view_mode = "twin" """)
      .withColumn("page_odd", col("page") + 1)
      .drop("page")
      .select(col("id"), col("date_time")
        .cast("timestamp"),col("page_odd")
        .alias("page"), col("page_view_mode"),
        col("event"),col("timeInterval"))
    val timeIntervalWithoddPage = df.union(odd)
    timeIntervalWithoddPage

  }

>

  • 请建议是否需要以更好的方式重构代码以实现更好的测试。

    我的目标是了解要测试的内容?在为上述代码编写测试时要注意什么?所有这些问题都是针对Spark代码单元测试的,而不是其他语言代码测试。


  • 共1个答案

    匿名用户

    读取JSON文件:如果您只是读取JSON文件,则不需要对此进行测试。此外,最好读取Schema()中具有显式模式的文件,以避免推断模式的一些问题。此外,您不需要3种相同的方法来读取文件。

    联合数据集:从Spark 2.3.0开始,就有了unionByName()函数。该函数按名称(而不是按位置)解析列。当您的DataFrames具有不同的列顺序时,您可以考虑这些函数以避免联合问题。当然,这个函数不需要测试。很难说mergedatasets()方法中的//其他逻辑代码。

    对于单元测试,您可以使用 ScalaTest 或其他工具。

    • 使用 master(“local”)创建 SparkSession;
    • 使用预期数据创建数据帧;
    • 为要测试的每个方法创建一个输入数据帧。
    • 比较预期和实际数据帧;

    以下项目可能很有用。您可以在其中找到如何比较两个DataFrames。此外,README:https://github.com/MrPowers/spark-fast-tests中有几个示例