Как вставить в втягивающуюся раковину из втягивающего потока с помощью планировщика мерцания - PullRequest
0 голосов
/ 20 июня 2020

Я пытаюсь взять Flink Table и преобразовать его в втягивающуюся раковину, которая затем подключается к раковине. Я смог сделать это в исходном планировщике таблиц, используя CRow, но похоже, что планировщик Blink Flink больше не поддерживает CRow. Есть ли способ выполнить sh это при использовании планировщика Blink?

Для справки, мы могли сделать это раньше, сопоставив отводящий поток с типом CRow перед подключением его к RetractStreamTableSink.

Ниже приведен пример модульного теста того, что я пытаюсь выполнить sh, обратите внимание, что закомментированный блок кода работает правильно в старом планировщике.

Это не удается со следующим исключением , что имеет смысл, учитывая, что убирающийся поток имеет тип Tuple2<Boolean, Row>, а Sink имеет тип Row, но я не вижу способа использовать Tuple2 втягивающий DataStream с RetractStreamTableSink<Row>

org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.sink2 do not match.
Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>]
Sink schema: [f0: STRING, f1: STRING]
    @Test
    public void retractStream() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);

        Row row1 = new Row(2);
        row1.setField(0, "1");
        row1.setField(1, "2");

        SingleOutputStreamOperator<Row> source =
                executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1);

        tableEnvironment.createTemporaryView("table1", source, "key, id");
        Table outputTable = tableEnvironment.sqlQuery("select key, id from table1");
        RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

        // This code block below works on Flink planner but fails on Blink planner because Blink treats all non-tuples
        // as POJOs
        // SingleOutputStreamOperator<?> tuple2DataStream = tableEnvironment
        //         .toRetractStream(outputTable, rowTypeInfo)
        //         .map(value -> new CRow(value.f1, value.f0))
        //         .returns(new CRowTypeInfo(rowTypeInfo));

        // Create the retracting stream
        DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(outputTable, rowTypeInfo);
        tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);

        // Create a sink
        TableSchema schema = new TableSchema(rowTypeInfo.getFieldNames(), rowTypeInfo.getFieldTypes());
        CollectingTableSink collectingTableSink = new CollectingTableSink(schema);
        RetractSink retractTableSink = new RetractSink(collectingTableSink);
        tableEnvironment.registerTableSink("sink2", retractTableSink);

        // Wire up the table and the sink (this is what fails)
        tableEnvironment.from("outputTable").insertInto("sink2");
        executionEnvironment.execute();
        System.out.println(collectingTableSink.rows);
    }

1 Ответ

0 голосов
/ 22 июня 2020
• 1000 из Tuple2 в Row без необходимости CRow.

Это именно то, для чего предназначен RetractStreamTableSink, но что-то вызывает сбой Blink при его использовании (даже в случае, когда оба AppendStreamTableSink и RetractStreamTableSink идентичны (все методы переопределены и равны, разница только в названии интерфейса, который вы реализуете). Я сильно подозреваю, что это ошибка в планировщике Blink, но мне не удалось определить, откуда она исходит.

Фрагмент кода, который выполняет преобразование:

@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
        DataStream<Row> filteredAndMapped =
                dataStream.filter(x -> x.f0).map(x -> x.f1).returns(delegate.getOutputType());

        return delegate.consumeDataStream(filteredAndMapped);
}
...