我正在做一个Flink项目,遇到了一个问题,我设法在Stackoverflow的帮助下解决了这个问题。然而,我不清楚为什么提议的解决方案实际上有效,我发现关于这个主题的信息很少。考虑以下代码:
object DeCP {
def main(args: Array[String]): Unit = {
val params: ParameterTool = ParameterTool.fromArgs(args)
// Get the execution environment and read the data
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val queryPoints: DataSet[Point] = readQueryPoints(env, params)
val points: DataSet[Point] = readFeatureVector(env, params)
// Process the query points
queryPoints
.map(new KNNRich)
.withBroadcastSet(points, "pointsIn")
.print
}
final class KNNRich extends RichMapFunction[Point, (Point, Vector[Point])]{
private var pointsIn: Traversable[Point] = _
override def open(parameters: Configuration): Unit =
pointsIn = getRuntimeContext.getBroadcastVariable[Point]("pointsIn").asScala
def map(queryPoint: Point): (Point, Vector[Point]) = {
val dataSetIn = ExecutionEnvironment.getExecutionEnvironment
.fromCollection(pointsIn.toVector)
val cluster = new Cluster(dataSetIn, queryPoint)
val knn = cluster.kNearestNeighbor(queryPoint, 3) // This call causes problems
(queryPoint, knn.collect.toVector)
}
}
}
Cluster类和伴随对象定义为:
class Cluster(var points: DataSet[Point],
var clusterLeader: Point) extends Serializable {
private var queryPoint: Point = _
def distance(p: Point): Point = {
p.eucDist(queryPoint)
}
def kNearestNeighbor(queryPoint: Point, k: Int): DataSet[Point] = {
this.queryPoint = queryPoint
this.points.map{p => distance(p)} // Task not serializable
this.points.map{p => p.eucDist(queryPoint)} // Works
this.points.map{p => Cluster.staticDistance(queryPoint, p)} // Works
}
}
object Cluster {
def staticDistance(queryPoint: Point, p: Point): Point = {
p.eucDist(queryPoint)
}
}
对< code>distance方法的调用导致任务不可序列化异常,但是用定义替换方法调用可以解决该问题。类似地,将完全相同的方法定义为companion对象的成员允许代码正常运行。
为什么第一个调用不起作用,而另外两个调用起作用?如果您在类上有更复杂的执行流程,而在配套对象上不容易替换为方法,会发生什么?
通过执行数据集转换,您只是创建了管道的逻辑规划。通过调用< code > execute/print/collect 将管道提交给群集。
当管道提交到集群时,每个函数(如您的RichMap函数)都被序列化,发送到集群,为每个并行实例复制,并独立执行。当您得到“任务不可序列化”异常时,这意味着您的
RichMap函数
正在瞬态引用该类之外的变量/对象。您应该确保函数是一个独立的块。
通过调用points.map{}
,你隐式地创建了一个MapFunction
。但是这个MapFunction
引用了Cluster
的实例,因此不是独立的。Flink 也尝试序列化集群
,但失败了。如果距离
是静态的(在伴随对象中定义),则不需要序列化 Cluster
。
顺便说一下,您的示例的另一个问题是,您没有按照预期使用数据集API。通常,您不应该在正在运行的管道中创建管道。这也可能导致意想不到的副作用。