我看到了将 Flink Table
对象转换为 DataStream
并运行 StreamExecutionEnvironment.execute
的示例。
我将如何编码运行连续查询,该查询使用表 API 写入流式处理接收器而不转换为数据流。
似乎这必须是可能的,因为否则指定流式接收器表连接器的目的是什么?
表 API 文档列出了连续查询和动态表,但大多数实际的 Java API 和代码示例似乎只将表 API 用于批处理。
编辑:为了向David Anderson展示我正在尝试的内容,这里有三个Flink SQL CREATE TABLE语句,位于类似的Derby SQL表之上。
我看到 JDBC 表连接器接收器支持流式处理,但我没有正确配置?我没有看到任何我忽略的东西。https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
仅供参考,当我让我的玩具示例工作时,我计划在生产中使用 Kafka 来获取类似输入/输出流的数据,将 JDBC/SQL 用于查找表。
CREATE TABLE LookupTableFlink (
`lookup_key` STRING NOT NULL,
`lookup_value` STRING NOT NULL,
PRIMARY KEY (lookup_key) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'LookupTable'
),
CREATE TABLE IncomingEventsFlink (
`field_to_use_as_lookup_key` STRING NOT NULL,
`extra_field` INTEGER NOT NULL,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'IncomingEvents'
), jdbcUrl);
CREATE TABLE TransformedEventsFlink (
`field_to_use_as_lookup_key` STRING,
`extra_field` INTEGER,
`lookup_key` STRING,
`lookup_value` STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'TransformedEvents'
), jdbcUrl);
String sqlQuery =
"SELECT\n" +
" IncomingEventsFlink.field_to_use_as_lookup_key, IncomingEventsFlink.extra_field,\n" +
" LookupTableFlink.lookup_key, LookupTableFlink.lookup_value\n" +
"FROM IncomingEventsFlink\n" +
"LEFT JOIN LookupTableFlink FOR SYSTEM_TIME AS OF IncomingEventsFlink.proctime\n" +
"ON (IncomingEventsFlink.field_to_use_as_lookup_key = LookupTableFlink.lookup_key)\n";
Table joinQuery = tableEnv.sqlQuery(sqlQuery);
// This seems to run, return, and complete and doesn't seem to run in continuous/streaming mode.
TableResult tableResult = joinQuery.executeInsert(TransformedEventsFlink);
可以使用 executeInsert
写入动态表,如
Table orders = tableEnv.from("Orders");
orders.executeInsert("OutOrders");
文档在这里。
这里解释过。
代码示例可以在这里找到:
// get StreamTableEnvironment.
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// convert the Table into an append DataStream of Tuple2<String, Integer>
// via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);