Apache функция flink keyby с выражением поля - PullRequest
0 голосов
/ 25 февраля 2020

Примеры сообщений:

>{"sensor":"temp1", "value":12.0, "timestamp":19200230}
>{"sensor":"temp1", "value":12.0, "timestamp":19200230}
>{"sensor":"temp1", "value":12.0, "timestamp":19200230}
>{"sensor":"temp2", "value":5, "timestamp":19200230}
>{"sensor":"temp2", "value":5, "timestamp":19200230}

Я пытаюсь создать агрегацию потока методом keyby.

DataStream<Message> messageSumStream = messageStream.keyBy("sensor").timeWindowAll(Time.minutes(5)).sum("value");

Я ожидал

{"sensor": "temp1", "value": 36.000000, "timestamp":19200230 }
{"sensor": "temp2", "value": 10.000000, "timestamp":19200230 }

Но получил:

{"sensor": "temp1", "value": 46.000000, "timestamp":19200230 }

Что мне здесь не хватает?

1 Ответ

2 голосов
/ 25 февраля 2020

Вы используете timeWindowAll из класса DataStream вместо timeWindow из KeyedDataStream, в результате чего получается код, игнорирующий keyBy.

Попробуйте это:

DataStream<Message> messageSumStream = messageStream.keyBy("sensor").timeWindow(Time.minutes(5)).sum("value");
...