Я использую Apache beam.
Я протестировал две версии моего конвейера: одну с боковым вводом (разных размеров), а другую без.
Когда я запускаюконвейер без бокового ввода, моя работа закончится примерно через 2 минуты. Когда я запускаю свою работу с боковым вводом, моя работа никогда не закончится.
Вот код, который я использую для хранения моего поиска (~ 1M записей)
PCollectionView<Map<String,String>> sideData = mapData.apply(View.<String,String>asMap());
Вот кодмоего parDo для итерации по каждой строке события (из 10M записей)
.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String,Integer> i = c.element();
String sideInputData = c.sideInput(sideData).get(i.getKey());
if (sideInputData == null) {
c.output(i);
}
}
}).withSideInputs(sideData))
edit:
Я использую кластер Flink, и я заметилчто выходное представление lookup asMap хранится только в одном работнике. Разве это не должно распространяться всеми работниками?