提问者:小点点

确定哪个对象在Apache-Flink中不可序列化


我正在编写一个 Flink 转换器,我有一个具有以下属性的自定义对象直方图

case class Histogram(
  nRows: Int,
  nCols: Int,
  min: Int,
  step: Double,
  private val countMatrix: Array[ArrayBuffer[Double]],
  private val cutMatrixL1: Array[ArrayBuffer[Double]],
  val distribMatrixL1: Array[ArrayBuffer[Map[Int, Double]]],
  private val distribMatrixL2: Array[ArrayBuffer[Map[Int, Double]]],
  private val cutMatrixL2: ArrayBuffer[ArrayBuffer[Double]])
  extends Serializable {
    ???
}

这是我的FitOperation

implicit val fitOp = new FitOperation[PIDiscretizerTransformer, LabeledVector] {
    override def fit(
                      instance: PIDiscretizerTransformer,
                      fitParameters: ParameterMap,
                      input: DataSet[LabeledVector]): Unit = {

      // get params...

      val metric = input.map { x ⇒
        // (instance, histrogram totalCount)
        (x, Histogram(nAttrs, l1InitialBins, min, instance.step), 1)
      }.reduce { (m1, m2) ⇒
        // Update Layer 1
        val updatedL1 = updateL1(m1._1, m1._2, instance.step, initialElems, alpha, m1._3)

        //         Update Layer 2 if neccesary
        val updatedL2 = if (m1._3 % l2updateExamples == 0) {
          updateL2(m1._1, updatedL1)
        } else updatedL1

        (m2._1, updatedL2, m1._3 + 1)
      }.map(_._2)

      //      instance.metricsOption = Some(metric)
    }
  }

这样做很好,但是如果我取消最后一行的注释:< code > instance . metrics option = Some(metric)我会得到一个< code > Java . io . notserializableexception:org . Apache . flink . API . Scala . dataset

我如何在我的类直方图中找到导致问题的对象?据我所知,<code>ArrayBuffer</code>是可序列化的,Map也是如此。尽管我发现了这个SO问题:

地图不能在scala中序列化?

上面写着<代码>。mapValues不可序列化,但我没有使用< code >。mapValues任意位置。


共1个答案

匿名用户

问题是您引用的是<code>实例。在MapFunction中执行步骤实例的类型为PIDiscretizerTransformer,无法序列化。因此,您需要在MapFunction之外计算步骤,并将值传递到函数中。那么您的程序应该是可序列化的。