Apache Flink - ежедневная агрегация почасовых агрегированных данных - PullRequest
0 голосов
/ 17 октября 2018

У меня есть оконный почасовой агрегированный DataStream.

DataStream <RawData> ds = .....

  SingleOutputStreamOperator<HourlyAggregated> hourly =  
  ds.keyBy(HourlyCountersAggregation.KEY_SELECTOR)
             .timeWindow(Time.hours(1))
             .aggregate(new HourlyCountersAggregation());

Каждый час этот DataStream <HourlyAggregated> опускается на Кассандру.

Я хочу также ежедневное агрегирование одного и того же DataStream <HourlyAggregated>.

Какой рекомендуемый способ сделать это из ежечасного агрегированного DataStream, чтобы избежать сохранения большого ежедневного состояния потоковой передачи из DataStream <RawData> ds....

1 Ответ

0 голосов
/ 17 октября 2018

Я думаю, вы могли бы попытаться разветвить график задания, чтобы использовать вывод ежечасных агрегированных данных как в приемнике, так и в следующем окне (чтобы вы могли выполнять ежедневную агрегацию поверх почасовой агрегации, сохраняя вычислениявремя).Что-то вроде:

SingleOutputStreamOperator<HourlyAggregated> hourly = ds.keyBy(HourlyCountersAggregation.KEY_SELECTOR)
         .timeWindow(Time.hours(1))
         .aggregate(new HourlyCountersAggregation());

hourly.addSink(...);

SingleOutputStreamOperator<HourlyAggregated> daily = hourly.timeWindow(Time.hours(24))
         .trigger(CountEventTrigger.of(24))
         .aggregate(...);
...