Я пытаюсь взять Flink Table и преобразовать его в втягивающуюся раковину, которая затем подключается к раковине. Я смог сделать это в исходном планировщике таблиц, используя CRow
, но похоже, что планировщик Blink Flink больше не поддерживает CRow
. Есть ли способ выполнить sh это при использовании планировщика Blink?
Для справки, мы могли сделать это раньше, сопоставив отводящий поток с типом CRow
перед подключением его к RetractStreamTableSink
.
Ниже приведен пример модульного теста того, что я пытаюсь выполнить sh, обратите внимание, что закомментированный блок кода работает правильно в старом планировщике.
Это не удается со следующим исключением , что имеет смысл, учитывая, что убирающийся поток имеет тип Tuple2<Boolean, Row>
, а Sink имеет тип Row
, но я не вижу способа использовать Tuple2
втягивающий DataStream
с RetractStreamTableSink<Row>
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.sink2 do not match.
Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>]
Sink schema: [f0: STRING, f1: STRING]
@Test
public void retractStream() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);
Row row1 = new Row(2);
row1.setField(0, "1");
row1.setField(1, "2");
SingleOutputStreamOperator<Row> source =
executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1);
tableEnvironment.createTemporaryView("table1", source, "key, id");
Table outputTable = tableEnvironment.sqlQuery("select key, id from table1");
RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
// This code block below works on Flink planner but fails on Blink planner because Blink treats all non-tuples
// as POJOs
// SingleOutputStreamOperator<?> tuple2DataStream = tableEnvironment
// .toRetractStream(outputTable, rowTypeInfo)
// .map(value -> new CRow(value.f1, value.f0))
// .returns(new CRowTypeInfo(rowTypeInfo));
// Create the retracting stream
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(outputTable, rowTypeInfo);
tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);
// Create a sink
TableSchema schema = new TableSchema(rowTypeInfo.getFieldNames(), rowTypeInfo.getFieldTypes());
CollectingTableSink collectingTableSink = new CollectingTableSink(schema);
RetractSink retractTableSink = new RetractSink(collectingTableSink);
tableEnvironment.registerTableSink("sink2", retractTableSink);
// Wire up the table and the sink (this is what fails)
tableEnvironment.from("outputTable").insertInto("sink2");
executionEnvironment.execute();
System.out.println(collectingTableSink.rows);
}