提问者:小点点

找不到AccessLog类型的编码器。需要隐式编码器[AccessLog]将AccessLog实例存储在数据集中


你好,我正在处理一个问题与scala/火花项目试图做一些计算我的scala代码在火花外壳上工作得很好,但当尝试运行相同的代码与sbt程序集转换scala.jar文件,我面临这个错误:

找不到AccessLog类型的编码器。需要一个隐式编码器[AccessLog]来将AccessLog实例存储在数据集中。通过导入park. int,支持原始类型(Int,String等)和产品类型(case类)。_将在未来的版本中添加对序列化其他类型的支持。

我正在尝试将Dataset[List[String]]转换为Dataset[AccessLog]AccessLog是一个案例类,通过使用映射它。

错误截图

生成错误的代码:

import org.apache.spark.sql.{ Dataset, Encoder, SparkSession }
import org.apache.spark.sql.functions._

object DstiJob {

  // try and catch
  def run(spark: SparkSession, inputPath: String, outputPath: String): String = {
    // import spark.sqlContext.implicits._
    import spark.implicits._
    import org.apache.spark.sql.{ Encoder, Encoders }
    // implicit val enc: Encoder[AccessLog] = Encoders.product[AccessLog]

    val inputPath = "access.log.gz"
    val outputPath = "data/reports"
    val logsAsString = spark.read.text(inputPath).as[String]

    case class AccessLog(ip: String, ident: String, user: String, datetime: String, request: String, status: String, size: String, referer: String, userAgent: String, unk: String)

    val R = """^(?<ip>[0-9.]+) (?<identd>[^ ]) (?<user>[^ ]) \[(?<datetime>[^\]]+)\] \"(?<request>[^\"]*)\" (?<status>[^ ]*) (?<size>[^ ]*) \"(?<referer>[^\"]*)\" \"(?<useragent>[^\"]*)\" \"(?<unk>[^\"]*)\""".r
    val dsParsed = logsAsString.flatMap(x => R.unapplySeq(x))
    def toAccessLog(params: List[String]) = AccessLog(params(0), params(1), params(2), params(3), params(5), params(5), params(6), params(7), params(8), params(9))

    val ds: Dataset[AccessLog] = dsParsed.map(toAccessLog _)
    val dsWithTime = ds.withColumn("datetime", to_timestamp(ds("datetime"), "dd/MMM/yyyy:HH:mm:ss X"))
    dsWithTime.cache
    dsWithTime.createOrReplaceTempView("AccessLog")

共1个答案

匿名用户

为了解决编译错误,应该在方法run之外定义case类。

而不是

object DstiJob {

    def run(spark: SparkSession, ...) {
       [...]
       case class AccessLog(...)
       val ds: Dataset[AccessLog] = ...
       [...]
    }
}

您可以使用

object DstiJob {

   case class AccessLog(...)

   def run(spark: SparkSession, ...) {
       [...]  
       val ds: Dataset[AccessLog] = ...
       [...]
   }
}

这应该可以解决问题,但不幸的是,我无法解释为什么这有帮助。