Обработка выходных данных из потока данных flink - PullRequest
0 голосов
/ 25 февраля 2019

ниже - псевдокод обработки моего потока.

Datastream env = StreamExecutionEnvironment.getExecutionEnvironment()    
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Datastream stream = env.addSource() .map(mapping to java object) 
    .filter(filter for specific type of events) 
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(2)){})
    .timeWindowAll(Time.seconds(10));

//collect all records.
Datastream windowedStream = stream.apply(new AllWindowFunction(...))

Datastream processedStream = windowedStream.keyBy(...).reduce(...)

String outputPath = ""

final StreamingFileSink sink = StreamingFileSink.forRowFormat(...).build();

processedStream.addSink(sink)

Приведенный выше поток кода создает несколько файлов, и каждый файл имеет записи разных окон, я полагаю.Например, записи в каждом файле имеют временные метки, которые находятся в диапазоне 30-40 секунд, тогда как время окна составляет всего 10 секунд.Мой ожидаемый шаблон вывода записывает данные каждого окна в отдельный файл.Любые ссылки или материалы по этому вопросу будут очень полезны.

1 Ответ

0 голосов
/ 25 февраля 2019

Посмотрите на интерфейс BucketAssigner .Он должен быть достаточно гибким, чтобы удовлетворить ваши потребности.Вам просто нужно убедиться, что ваши потоковые события содержат достаточно информации, чтобы определить путь, по которому вы хотите, чтобы они были записаны.

...