Apache Flink уменьшает результаты во многих значениях вместо одного - PullRequest
0 голосов
/ 05 декабря 2018

Я пытаюсь реализовать редукцию WindowedStream, например, так:

                .keyBy(t -> t.key)
            .timeWindow(Time.of(15, MINUTES), Time.of(1, MINUTES))
            .reduce(new ReduceFunction<TwitterSentiments>() {
                @Override
                public TwitterSentiments reduce(TwitterSentiments t2, TwitterSentiments t1) throws Exception {
                    t2.positive += t1.positive;
                    t2.neutral += t1.neutral;
                    t2.negative += t1.negative;

                    return t2;
                }
            });

Проблема, с которой я сталкиваюсь, заключается в том, что когда я вызываю stream.print (), я получаю много значений (выглядит как однона объект TwitterSentiments, вместо одного агрегатного объекта.

Я также попытался использовать функцию AggregationFunction, как эта, с той же проблемой:

                .aggregate(new AggregateFunction<TwitterSentiments, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
                @Override
                public Tuple3<Long, Long, Long> createAccumulator() {
                    return new Tuple3<Long, Long, Long>(0L,0L,0L);
                }

                @Override
                public Tuple3<Long, Long, Long> add(TwitterSentiments ts, Tuple3<Long, Long, Long> accumulator) {
                    return new Tuple3<Long, Long, Long>(
                            accumulator.f0 + ts.positive.longValue(),
                            accumulator.f1 + ts.neutral.longValue(),
                            accumulator.f2 + ts.negative.longValue()
                    );
                }

                @Override
                public Tuple3<Long, Long, Long> getResult(Tuple3<Long, Long, Long> accumulator) {
                    return accumulator;
                }

                @Override
                public Tuple3<Long, Long, Long> merge(Tuple3<Long, Long, Long> accumulator1, Tuple3<Long, Long, Long> accumulator2) {
                    return new Tuple3<Long, Long, Long>(
                            accumulator1.f0 + accumulator2.f0,
                            accumulator1.f1 + accumulator2.f1,
                            accumulator1.f2 + accumulator2.f1);
                }
            });

Каковы причины, по которым stream.print () все равно будет выводить много записей после этих агрегатов?

Ответы [ 2 ]

0 голосов
/ 06 декабря 2018

Если вам не нужен результат для каждого ключа, вы можете использовать timeWindowAll для получения одного результата.Однако timeWindowAll не работает параллельно.Если вы хотите вычислить результат более масштабируемым образом, вы можете сделать это:

    .keyBy(t -> t.key)
    .timeWindow(<time specification>)
    .reduce(<reduce function>)
    .timeWindowAll(<same time specification>)
    .reduce(<same reduce function>)

Вы можете ожидать, что среда выполнения Flink будет достаточно умной, чтобы выполнить эту параллельную предварительную агрегацию для вас (при условии, что вы используетеReduceFunction или AggregateFunction), но это не так.

0 голосов
/ 06 декабря 2018

Похоже, я неправильно понял причину использования ключей.В моем случае мне не нужно KeyedStream, так как я хочу только один вывод в минуту, который состоит из всех записей, сведенных к одному значению.В итоге я использовал .timeWindowAll, SingleOutputStreamOperator, и теперь мой редуктор работает, как и ожидалось.

...