У меня очень простая настройка потокового конвейера в apache flink, конвейер работает, и мне удалось применить функцию processFunction к потоку входных данных следующим образом:
DataStream<MeasurementData> data = env.addSource(consumer);
DataStream<MeasurementData> dataProcessed =data.process(new FFT());
dataProcessed.print();
dataProcessed.addSink(new FlinkKafkaProducer011<>(
"localhost:9092", // Kafka broker host:port
OUTPUT_TOPIC, // Topic to write to
new MeasurementDataSchema()) // Serializer
);
Теперь я хотел бы применитьProcessWindowFunction, работающая с окнами определенного времени, вместо применения функции для каждого входящего элемента данных.Я попробовал это так:
DataStream<MeasurementData> dataProcessed = data.timeWindowAll(Time.minutes(5))
.process(new MyProcessWindowFunction());
И определение MyProcessWindowFunction ():
public static class MyProcessWindowFunction extends ProcessAllWindowFunction<MeasurementData, MeasurementData, TimeWindow> {
public void process(Context context, Iterable<MeasurementData> input, Collector<MeasurementData> out) {
long count = 0;
for (MeasurementData data : input) {
for (int frequencyCounter = 0; frequencyCounter < data.data.size(); frequencyCounter++) {
matrices[frequencyCounter].addElement(data.u, data.v, data.data.get(frequencyCounter).get(0));
}
count++;
out.collect(data);
}
}
}
Но эта функция, кажется, никогда не вызывается.Я попытался разместить там операторы печати, а также прошел через всю программу с помощью отладчика.Я что-то пропустил?Любая подсказка приветствуется.