флинк счет отдельная проблема - PullRequest
0 голосов
/ 09 марта 2020

Теперь мы используем акробатическое окно, чтобы считать разные. Проблема, которую мы имеем, заключается в том, что если мы будем увеличивать наше падающее окно изо дня в месяц, у нас не может быть числа по состоянию на текущий отчетливый счет. Это означает, что если мы установим аккордное окно равным 1 месяцу, число, которое мы получаем, будет указываться с 1-го числа каждого месяца. Как я могу получить текущий отчетливый счет (сейчас 9 марта)?

package flink.trigger;

import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.SimpleDateFormat;
import java.util.Date;

public class CustomCountDistinctTigger extends Trigger<Object, TimeWindow> {

    private final ReducingStateDescriptor<Long> timeState =
            new ReducingStateDescriptor<>("fire-interval", new DistinctCountAggregateFunction(), LongSerializer.INSTANCE);
    private long interval;


    public CustomCountDistinctTigger(long interval) {
        this.interval = interval;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeState);

        timestamp = ctx.getCurrentProcessingTime();

        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
            fireTimestamp.add(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
//        System.out.println("onProcessingTime called at "+System.currentTimeMillis() );
//        return TriggerResult.FIRE_AND_PURGE;
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(df.format(new Date()));
        //interval
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeState);

        if(window.maxTimestamp() == time) {
            return TriggerResult.FIRE_AND_PURGE;
        }
        else if (fireTimestamp.get().equals(time)) {
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

    }

}


distinct count:
DataStreamSink<Tuple2<String, Integer>> finalResultStream = keyedStream
                            .flatMap(new KPIDistinctDataFlatMapFunction(inputSchema))
                            .map(new SwapMap())
                            .keyBy(new WordKeySelector())
                            .window(TumblingProcessingTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
                            .trigger(new CustomCountDistinctTigger(1 * 60 * 6000))
                            .aggregate(new DistinctCountAggregateFunction())
                            .print("final print");

1 Ответ

1 голос
/ 09 марта 2020

Вы можете определить пользовательский триггер, который возвращает FIRE один раз в день для запуска промежуточных результатов, а затем выполняет FIRE_AND_PURGE в конце месяца, чтобы закрыть окно.

Каждый раз, когда триггер возвращает FIRE ваше окно оценивается путем вызова метода process() вашего ProcessWindowFunction, после чего он может дать результаты с предоставленным Collector. FIRE_AND_PURGE оценивает окно в последний раз, а затем уничтожает его.

См. Также ответы на этот вопрос - Как отобразить промежуточные результаты в потоковом потоке-etl? - который охватывал связанный топи c.

...