Как группировать кортежи в окне в штормовой трезубец? - PullRequest
0 голосов
/ 28 января 2020

Мне нужно выполнить групповую обработку для кортежей, которые попадают в окно tumblingWindow (групповая работа основана на двух других полях плюс их временное окно) в трезубце Storm, а затем применить к ним функцию агрегирования. Следующий код объединяет все кортежи в окне:

topology.newStream("fixed_spout", fixedSpout).each(new Fields("flows"), new  ExtractflowValues2("IPV4_SRC_ADDR"),new Fields("sa"))
    .each(new Fields("flows"),new ExtractflowValues2("L4_DST_PORT"),new Fields("dp"))
    .each(new Fields("flows"),new ExtractflowValues2("IPV4_DST_ADDR"),new Fields("da"))
    .tumblingWindow(BaseWindowedBolt.Duration.seconds(5), wsf, new Fields(groupbyFileds), new countDistinct("da"), new Fields("count"));

В приведенном выше коде я сначала извлекаю 3 поля (sa, da и dp) из моих кортежей, а затем помещаю кортежи в windows продолжительность 5 секунд и подсчитать количество их различных "да" для каждого окна. Однако, что мне действительно нужно, это поместить кортежи в windows продолжительностью 5 секунд и сгруппировать эти кортежи с помощью полей «sa» и «dp», а затем подсчитать количество их различных «da». , Как мне этого добиться?

...