Flink SQL: повторение ключей группировки в результате запроса GROUP BY - PullRequest
0 голосов
/ 31 августа 2018

Я хочу сделать простой запрос в Flink SQL в одной таблице, которая включает оператор group by. Но в результатах есть повторяющиеся строки для столбца, указанного в выражении group by. Это потому, что я использую потоковую среду, а она не запоминает состояние?

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// configure Kafka consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
props.setProperty("group.id", "flink-consumer"); // Consumer group ID

FlinkKafkaConsumer011<BlocksTransactions> flinkBlocksTransactionsConsumer = new FlinkKafkaConsumer011<>(args[0], new BlocksTransactionsSchema(), props);
flinkBlocksTransactionsConsumer.setStartFromEarliest();

DataStream<BlocksTransactions> blocksTransactions = env.addSource(flinkBlocksTransactionsConsumer);


tableEnv.registerDataStream("blocksTransactionsTable", blocksTransactions);

Table sqlResult
        = tableEnv.sqlQuery(
                "SELECT block_hash, count(tx_hash) " +
                "FROM blocksTransactionsTable " +
                "GROUP BY block_hash");

DataStream<Test> resultStream = tableEnv
        .toRetractStream(sqlResult, Row.class)
        .map(t -> {
            Row r = t.f1;
            String field2 = r.getField(0).toString();
            long count = Long.valueOf(r.getField(1).toString());
            return new Test(field2, count);
        })
        .returns(Test.class);

resultStream.print();

resultStream.addSink(new FlinkKafkaProducer011<>("localhost:9092", "TargetTopic", new TestSchema()));

env.execute();

Я использую оператор group by для столбца block_hash, но у меня несколько раз один и тот же block_hash. Это результат печати ():

Test {field2 = '0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count = 1} Test {field2 = '0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count = 1} Test {field2 = '0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count = 2} Test {field2 = '0x780aadc08c294da46e174fa287172038bba7afacf2dff41fdf0f6def03906e60', count = 1} Test {field2 = '0x182d31bd491527e1e93c4e44686057207ee90c6a8428308a2bd7b6a4d2e10e53', count = 1} Test {field2 = '0x182d31bd491527e1e93c4e44686057207ee90c6a8428308a2bd7b6a4d2e10e53', count = 1}

Как это исправить без использования BatchEnvironment?

1 Ответ

0 голосов
/ 31 августа 2018

Запрос GROUP BY, выполняемый в потоке, должен производить обновления. Рассмотрим следующий пример:

SELECT user, COUNT(*) FROM clicks GROUP BY user;

Каждый раз, когда таблица clicks получает новую строку, число соответствующих user необходимо увеличивать и обновлять.

Когда вы конвертируете Table в DataStream, эти обновления должны быть закодированы в потоке. Flink использует ретракцию и добавляет сообщения для этого Позвонив по номеру tEnv.toRetractStream(table, Row.class), вы конвертируете Table table в DataStream<Tuple2<Boolean, Row>. Флаг Boolean важен и указывает, добавлен или убран Row из таблицы результатов.

Учитывая приведенный выше пример запроса и входную таблицу clicks как

user | ...
------------
Bob  | ...
Liz  | ...
Bob  | ...

Вы получите следующий поток отвода

(+, (Bob, 1)) // add first result for Bob
(+, (Liz, 1)) // add first result for Liz
(-, (Bob, 1)) // remove outdated result for Bob
(+, (Bob, 2)) // add updated result for Bob

Вам нужно самостоятельно поддерживать результат, добавлять и удалять строки, как указано флагом Boolean потока отвода.

...