Итак, вот такое рабочее решение.Скорее всего, я буду редактировать его для любых ошибок, которые я могу сделать в понимании вопроса.(PS код шаблона находится в Java).Предполагая, что input
является вашим источником потока
PCollection<Messages> msgs = input.apply(Window.<Messages>into(
FixedWindows.of(Duration.standardSeconds(1))
.triggering(AfterWatermark.pastEndOfWindow()
// fire the moment you see an element
.withEarlyFirings(AfterPane.elementCountAtLeast(1))
//optional since you have small window
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
.withAllowedLateness(Duration.standardMinutes(60))
.discardingFiredPanes());
Это позволит вам прочитать поток Messages
, который может быть либо строкой, либо HashMap, либо даже списком.Заметьте, что вы говорите лучу открывать окно для каждого элемента, который он получает, и вы установили максимальное значение окна в 1 секунду.Вы можете изменить это, если хотите запускать каждые 10 сообщений и минутное окно и т. Д.
После этого вам нужно написать 2 класса, которые расширяют DoFn в основном
PCollection<Element> top = msgs.apply(ParDo.of(new ExtractElements()))
.apply(ParDo.of(new TopElement()));
Где Element
может быть String, int, double и т. Д.
Наконец, вы можете каждый Element
на хранение с:
top.apply(ParDo.of(new ParsetoString()))
.apply(TextIO.write().withWindowedWrites()
.withNumShards(1)
.to(filename));
Следовательно, у вас будет примерно 1 файл для каждого сообщения, которое может быть много.Но, к сожалению, вы не можете добавить в файл.Если вы не сделаете управление окнами, где вы группируете все элементы в один список и записываете в него.
Конечно, есть хакерский способ сделать это без окон, и я объясню, если этот вариант использования не работает с вами (или если вам любопытно)
Позвольтея знаю, если я что-то пропустил!:)