Как сложить несколько полей в Flink - PullRequest
0 голосов
/ 08 апреля 2020

Я суммирую одно поле в Apache Флинк 1.10 вот так, я получаю сообщения RabbitMQ и обрабатываю их в памяти, наконец сохраняю их в MySQL, код операции суммы такой:

 consumeRecord.keyBy("gameType")
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("realPumpAmount")
                .addSink(new SinkFunction<ReportPump>() {
                    @Override
                    public void invoke(ReportPump value, Context context) throws Exception {
                        // handle sink logic
                    }
                });

Теперь я хочу суммировать несколько полей в сущности MQ следующим образом:

consumeRecord.keyBy("gameType")
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("field1","field2")
                .addSink(new SinkFunction<ReportPump>() {
                    @Override
                    public void invoke(ReportPump value, Context context) throws Exception {
                        // handle sink logic
                    }
                });

Есть ли способ реализовать эту цель?

1 Ответ

1 голос
/ 08 апреля 2020

sum редуктор принимает только одно поле. Вы можете написать такой редуктор самостоятельно:

consumeRecord.keyBy("gameType")
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce((d1, d2) -> {
        d1.field1 += d2.field1;
        d1.field2 += d2.field2;
        return d1;
    })
    .addSink(new SinkFunction<ReportPump>() {
        @Override
        public void invoke(ReportPump value, Context context) throws Exception {
            // handle sink logic
        }
    });
...