Функция TimeWindowAll не вызывается в Apache Flink - PullRequest
0 голосов
/ 31 мая 2018

У меня очень простая настройка потокового конвейера в apache flink, конвейер работает, и мне удалось применить функцию processFunction к потоку входных данных следующим образом:

    DataStream<MeasurementData> data = env.addSource(consumer);
    DataStream<MeasurementData> dataProcessed =data.process(new FFT());
    dataProcessed.print();
    dataProcessed.addSink(new FlinkKafkaProducer011<>(
            "localhost:9092",      // Kafka     broker host:port
            OUTPUT_TOPIC,       // Topic to write to
            new MeasurementDataSchema())  // Serializer
    );  

Теперь я хотел бы применитьProcessWindowFunction, работающая с окнами определенного времени, вместо применения функции для каждого входящего элемента данных.Я попробовал это так:

        DataStream<MeasurementData> dataProcessed = data.timeWindowAll(Time.minutes(5))
       .process(new MyProcessWindowFunction());

И определение MyProcessWindowFunction ():

public static class MyProcessWindowFunction extends ProcessAllWindowFunction<MeasurementData, MeasurementData, TimeWindow> {

    public void process(Context context, Iterable<MeasurementData> input, Collector<MeasurementData> out) {
        long count = 0;
        for (MeasurementData data : input) {
            for (int frequencyCounter = 0; frequencyCounter < data.data.size(); frequencyCounter++) {
                matrices[frequencyCounter].addElement(data.u, data.v, data.data.get(frequencyCounter).get(0));
            }
            count++;
            out.collect(data);
        }

    }
}

Но эта функция, кажется, никогда не вызывается.Я попытался разместить там операторы печати, а также прошел через всю программу с помощью отладчика.Я что-то пропустил?Любая подсказка приветствуется.

1 Ответ

0 голосов
/ 31 мая 2018

Обнаружена проблема: среда была настроена на использование EventTime вместо processingTime, в то время как мои данные не содержат отметок времени события.

...