Я работаю над сценарием использования, в котором у меня есть два неограниченных потока, и я хочу выполнить левое соединение с этими потоками. Использование окна фиксированного размера в 5 минут без опозданий. Для объединения я использую библиотеку расширений java. Но после Join это не выдаёт результаты. Код для того же:
PCollection<KV<String, KV<GenericRecord, GenericRecord>>> joinedDatasets = Join.leftOuterJoin(aById, bById, GenericRecord);
PCollection<GenericRecord> result = joinedDatasets.apply(ParDo.of(new DoFn<KV<String, KV<GenericRecord, GenericRecord>>, GenericRecord>() {
@ProcessElement
public void processElement(@Element KV<String, KV<GenericRecord, GenericRecord>> element, OutputReceiver<GenericRecord> out) {
LogHelper.info(element.getKey());
//some processing logic
}
}));
Я пробовал cogrpbykey, но поведение такое же. Работает на directrunner