提问者:小点点

Flink 的 JDBC 接收器失败,出现不可序列化错误


我正在按照 https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/jdbc.html 使用 mysql 数据库作为 Flink 的接收器。代码编译成功,但在 Flink 集群中执行作业失败,并显示

The program finished with the following exception:

The implementation of the AbstractJdbcOutputFormat is not serializable. The object probably contains or references non serializable fields.
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
        org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1899)
        org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:189)
        org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1296)
        org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1131)
        Aggregator.Aggregator$.main(Aggregator.scala:81)

以下是代码的相关部分:

object Aggregator {
  @throws[Exception]
  def main(args: Array[String]): Unit = {

    [...]

    val counts = stream.map { x => (
        x.get("value").get("id").asInt(),
        x.get("value").get("kpi").asDouble()
      )}
      .keyBy(0)
      .timeWindow(Time.seconds(60))
      .sum(1)

    counts.print()

    val statementBuilder: JdbcStatementBuilder[(Int, Double)] = (ps: PreparedStatement, t: (Int, Double)) => {
                    ps.setInt(1, t._1);
                    ps.setDouble(2, t._2);
                };

    val connection = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                  .withDriverName("mysql.Driver")
                  .withPassword("XXX")
                  .withUrl("jdbc:mysql://<DB_HOST>:3306/<DB_NAME>")
                  .withUsername("<USERNAME>")
                  .build();

    val jdbcSink = JdbcSink.sink(
                "INSERT INTO table (id, kpi) VALUES (?, ?)",
                statementBuilder,
                connection);

    counts.addSink(jdbcSink)
 
    env.execute("Aggregator")
  }
}

我不确定这里的问题出在哪个代码部分,以及如何调试。不幸的是,我也找不到Scala中JDBC接收器的参考实现。感谢任何帮助!


共1个答案

匿名用户

对我有用的是显式创建JdbcStatementBuilder。类似于:

val statementBuilder: JdbcStatementBuilder[(Int, Double)] =
  new JdbcStatementBuilder[(Int, Double)] {
    override def accept(ps: PreparedStatement, t: (Int, Double)): Unit = {
      ps.setInt(1, t._1)
      ps.setDouble(2, t._2)
    }
}