Как улучшить производительность поиска, используя sideinputs? - PullRequest
0 голосов
/ 07 ноября 2019

Я использую Apache beam.

Я протестировал две версии моего конвейера: одну с боковым вводом (разных размеров), а другую без.

Когда я запускаюконвейер без бокового ввода, моя работа закончится примерно через 2 минуты. Когда я запускаю свою работу с боковым вводом, моя работа никогда не закончится.

Вот код, который я использую для хранения моего поиска (~ 1M записей)

  PCollectionView<Map<String,String>> sideData = mapData.apply(View.<String,String>asMap());

Вот кодмоего parDo для итерации по каждой строке события (из 10M записей)

  .apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {

  @ProcessElement
  public void processElement(ProcessContext c) {
    KV<String,Integer> i = c.element();

    String sideInputData = c.sideInput(sideData).get(i.getKey());
    if (sideInputData == null) {
      c.output(i);
    } 

  }

 }).withSideInputs(sideData))

edit:

Я использую кластер Flink, и я заметилчто выходное представление lookup asMap хранится только в одном работнике. Разве это не должно распространяться всеми работниками?

...