Как решить исключение дубликатов значений при создании PCollectionView> - PullRequest
0 голосов
/ 29 января 2019

Я устанавливаю медленно меняющуюся карту поиска в моем конвейере Apache-Beam.Он постоянно обновляет карту поиска.Для каждого ключа в карте поиска я получаю последнее значение в глобальном окне с режимом накопления.Но это всегда соответствует исключению:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Duplicate values for mykey

Что-то не так с этим фрагментом кода?

Если я использую .discardingFiredPanes(), я потеряю информацию в последнемemit.

pipeline
  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
  .apply(
      Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(
             AfterProcessingTime.pastFirstElementInPane()))
         .accumulatingFiredPanes())
  .apply(new ReadSlowChangingTable())
  .apply(Latest.perKey())
  .apply(View.asMap());

Пример триггера ввода:

t1 : KV<k1,v1> KV< k2,v2>
t2 : KV<k1,v1>

accumulatingFiredPanes => ожидаемый результат при t2 => KV (k1, v1), KV (k2, v2), но не удалосьиз-за дублированного исключения

discardingFiredPanes => ожидаемый результат при t2 => KV (k1, v1) Успех

1 Ответ

0 голосов
/ 11 февраля 2019

В частности, в отношении view.asMap и накопления панелей обсуждение в комментариях:

Если вы хотите использовать боковой ввод View.asMap (например, когда источником элементов карты являетсясам распределен - часто потому, что вы создаете побочный ввод из выходных данных предыдущего преобразования), есть некоторые другие факторы, которые необходимо учитывать: View.asMap само является агрегацией, она унаследует триггеры и накопит свой ввод,В этом конкретном шаблоне установка конвейера в режим аккумулирования панелей перед этим преобразованием приведет к дублированию ошибок ключа, даже если преобразование, такое как Latest.perKey, используется до преобразования View.asMap.

Учитывая, что чтение обновляет всю карту, тогда я думаю, что использование View.asSingleton будет лучшим подходом для этого варианта использования.

Некоторые общие замечания относительно этого шаблона, которые, будем надеяться, будут полезны и для других:

Для этого шаблона мы можем использовать исходное преобразование GenerateSequence, чтобы периодически выдавать значение, например, один раздень.Передайте это значение в глобальное окно через управляемый данными триггер, который активируется для каждого элемента.В DoFn используйте этот процесс в качестве триггера для извлечения данных из вашего ограниченного источника Create вашего SideInput для использования в нисходящих преобразованиях.

Важно отметить, что, поскольку этот шаблон использует сторону глобального окназапуск входа во время обработки, сопоставление с элементами, обрабатываемыми во время события, будет недетерминированным.Например, если у нас есть основной конвейер, в котором отображается время события, версия SideInput View, которую увидят эти окна, будет зависеть от последнего триггера, который сработал во время обработки, а не от времени события.

Также важно отметить, что в общем случае боковой ввод должен быть чем-то, что умещается в памяти.

Java (SDK 2.9.0):

В приведенном ниже примере боковые входные данные обновляются с очень короткими интервалами, так что эффекты можно легко увидеть.Ожидается, что боковой ввод обновляется медленно, например, каждые несколько часов или один раз в день.

В приведенном ниже примере кода мы используем Map, который мы создаем в DoFn, который становится View.asSingleton, это рекомендуемый подход для этого шаблона.

Пример ниже иллюстрирует шаблон, обратите внимание, что View.asSingleton перестраивается при каждом обновлении счетчика.

public static void main(String[] args) {

 // Create pipeline
 PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
     .as(PipelineOptions.class);

 // Using View.asSingleton, this pipeline uses a dummy external service as illustration.
 // Run in debug mode to see the output
 Pipeline p = Pipeline.create(options);

 // Create slowly updating sideinput

 PCollectionView<Map<String, String>> map = p
     .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))

     .apply(Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
         .discardingFiredPanes())

     .apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
       @ProcessElement public void process(@Element Long input,
           OutputReceiver<Map<String, String>> o) {
         // Do any external reads needed here...
         // We will make use of our dummy external service.
         // Every time this triggers, the complete map will be replaced with that read from 
         // the service.
         o.output(DummyExternalService.readDummyData());
       }

     })).apply(View.asSingleton());

 // ---- Consume slowly updating sideinput

 // GenerateSequence is only used here to generate dummy data for this illustration.
 // You would use your real source for example PubSubIO, KafkaIO etc...
 p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
     .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
     .apply(Sum.longsGlobally().withoutDefaults())
     .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {

       @ProcessElement public void process(ProcessContext c) {
         Map<String, String> keyMap = c.sideInput(map);
         c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

  LOG.debug("Value is {} key A is {} and key B is {}"
, c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));

       }
     }).withSideInputs(map));

 p.run();
}

public static class DummyExternalService {

 public static Map<String, String> readDummyData() {

   Map<String, String> map = new HashMap<>();
   Instant now = Instant.now();

   DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");

   map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
   map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());

   return map;

 }
}
...