提问者:小点点

如何在Spark 2. X数据集中创建自定义编码器?


对于Pojo的/原语,Spark数据集从Row的Encoder移动到Encoder。Catalyst引擎使用ExpressionEncoder转换SQL表达式中的列。然而,似乎没有其他Encoder子类可用作我们自己实现的模板。

以下是在Spark 1. X/DataFrames中满意的代码示例,该代码在新制度中无法编译:

//mapping each row to RDD tuple
df.map(row => {
    var id: String = if (!has_id) "" else row.getAs[String]("id")
    var label: String = row.getAs[String]("label")
    val channels  : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
    val height  : Int = if (!has_height) 0 else row.getAs[Int]("height")
    val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
    val data : Array[Byte] = row.getAs[Any]("data") match {
      case str: String => str.getBytes
      case arr: Array[Byte@unchecked] => arr
      case _ => {
        log.error("Unsupport value type")
        null
      }
    }
    (id, label, channels, height, width, data)
  }).persist(StorageLevel.DISK_ONLY)

}

我们得到一个编译器错误

Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported 
by importing spark.implicits._  Support for serializing other types will be added in future releases.
    df.map(row => {
          ^

所以不管怎样/某个地方应该有办法

  • 定义/实现我们的自定义编码器
  • DataFrame(现在是Row类型的数据集)上执行映射时应用它
  • 注册编码器以供其他自定义代码使用

我正在寻找成功执行这些步骤的代码。


共3个答案

匿名用户

据我所知,自1.6以来没有什么真正改变,如何在数据集中存储自定义对象?中描述的解决方案是唯一可用的选项。然而,您当前的代码应该可以很好地与产品类型的默认编码器一起工作。

要了解为什么您的代码在1. x中工作而在2.0.0中可能不工作,您必须检查签名。在1.x中DataFrame.map是一个采用函数Row=的方法

在2.0.0中DataFrame.map接受类型为Row=的函数

df.rdd.map(row => ???)

对于数据集[行]map,请参阅尝试将数据框行映射到更新行时的编码器错误

匿名用户

你导入隐式编码器了吗?

导入_

http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.Encoder

匿名用户

_其中火花是SparkSession,它解决了错误和自定义编码器得到了进口。

此外,编写自定义编码器是我从未尝试过的一种方法。

工作解决方案:-创建SparkSession并导入以下内容

导入_