Как получить ключ DataStream после keyBy () в Flink Java API - PullRequest
0 голосов
/ 08 июля 2019

Я читаю из кластера Kafka в потоковом приложении Flink. После получения исходного потока я хочу агрегировать события с помощью составного ключа и временного окна timeEvent, а затем записать результат в таблицу. Проблема в том, что после применения моей агрегатной функции, которая просто подсчитывает количество кликов по clientId, я не нахожу способ получить ключ каждой выходной записи, поскольку API возвращает экземпляр накопленного результата, но не соответствующий ключ.

    DataStream<Event> stream = environment.addSource(mySource)

    stream.keyBy(new KeySelector<Event,Integer>() {
    public Integer getKey(Event event) { return event.getClientId(); })
.window(TumblingEventTimeWindows.of(Time.minutes(1))).aggregate(new MyAggregateFunction)

Как я могу получить ключ, который я указал ранее? Я не вводил ключ ввода событий в аккумуляторе, так как чувствовал, что мне было бы нехорошо.

1 Ответ

1 голос
/ 09 июля 2019

Вместо

.aggregate(new MyAggregateFunction)

вы можете использовать

.aggregate(new MyAggregateFunction, new MyProcessWindowFunction)

, и в этом случае методу процесса вашей ProcessWindowFunction будет передан ключ вместе с предварительно агрегированным результатомваша AggregateFunction и объект Context с другой потенциально важной информацией.См. Раздел в документации по Функция ProcessWindow с инкрементальной агрегацией для получения более подробной информации.

...