提问者:小点点

Flink 表 API -> Streaming Sink?


我看到了将 Flink Table 对象转换为 DataStream 并运行 StreamExecutionEnvironment.execute 的示例。

我将如何编码运行连续查询,该查询使用表 API 写入流式处理接收器而不转换为数据流。

似乎这必须是可能的,因为否则指定流式接收器表连接器的目的是什么?

表 API 文档列出了连续查询和动态表,但大多数实际的 Java API 和代码示例似乎只将表 API 用于批处理。

编辑:为了向David Anderson展示我正在尝试的内容,这里有三个Flink SQL CREATE TABLE语句,位于类似的Derby SQL表之上。

我看到 JDBC 表连接器接收器支持流式处理,但我没有正确配置?我没有看到任何我忽略的东西。https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html

仅供参考,当我让我的玩具示例工作时,我计划在生产中使用 Kafka 来获取类似输入/输出流的数据,将 JDBC/SQL 用于查找表。

CREATE TABLE LookupTableFlink (
  `lookup_key` STRING NOT NULL,
  `lookup_value` STRING NOT NULL,
  PRIMARY KEY (lookup_key) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:derby:memory:myDB;create=false',
  'table-name' = 'LookupTable'
),

CREATE TABLE IncomingEventsFlink (
  `field_to_use_as_lookup_key` STRING NOT NULL,
  `extra_field` INTEGER NOT NULL,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:derby:memory:myDB;create=false',
  'table-name' = 'IncomingEvents'
), jdbcUrl);

CREATE TABLE TransformedEventsFlink (
  `field_to_use_as_lookup_key` STRING,
  `extra_field` INTEGER,
  `lookup_key` STRING,
  `lookup_value` STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:derby:memory:myDB;create=false',
  'table-name' = 'TransformedEvents'
), jdbcUrl);
String sqlQuery =
                "SELECT\n" +
                "  IncomingEventsFlink.field_to_use_as_lookup_key, IncomingEventsFlink.extra_field,\n" +
                "  LookupTableFlink.lookup_key, LookupTableFlink.lookup_value\n" +
                "FROM IncomingEventsFlink\n" +
                "LEFT JOIN LookupTableFlink FOR SYSTEM_TIME AS OF IncomingEventsFlink.proctime\n" +
                "ON (IncomingEventsFlink.field_to_use_as_lookup_key = LookupTableFlink.lookup_key)\n";

Table joinQuery = tableEnv.sqlQuery(sqlQuery);
// This seems to run, return, and complete and doesn't seem to run in continuous/streaming mode.
TableResult tableResult = joinQuery.executeInsert(TransformedEventsFlink);

共2个答案

匿名用户

可以使用 executeInsert 写入动态表,如

Table orders = tableEnv.from("Orders");
orders.executeInsert("OutOrders");

文档在这里。

匿名用户

这里解释过。

代码示例可以在这里找到:

// get StreamTableEnvironment. 
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// Table with two fields (String name, Integer age)
Table table = ...

// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);

// convert the Table into an append DataStream of Tuple2<String, Integer> 
//   via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream = 
  tableEnv.toRetractStream(table, Row.class);