Боковой ввод в глобальном окне как медленно меняющиеся вопросы кеша - PullRequest
2 голосов
/ 23 января 2020

Контекст: У нас есть несколько файлов схемы в облачном хранилище. В нашей работе Dataflow нам нужно обращаться к этим файлам схемы для преобразования наших данных. Эти файлы схемы меняются ежедневно / еженедельно. Наш источник данных - PubSub, и мы помещаем сообщения PubSub в фиксированное окно продолжительностью 1 минута. Файлы схемы, которые нам нужны, хорошо вписываются в память, их размер составляет около 90 МБ.

Что я пробовал: Ссылаясь на , это делает c с Apache Луч, мы создали боковой ввод, который записывает в глобальное окно с GenerateSequence, например так:

    // Creates a side input that refreshes the schema every minute
PCollectionView<Map<String, byte[]>> dataBlobView =
    pipeline.apply(GenerateSequence.from(0).withRate(1, Duration.standardDays(1L)))
        .apply(Window.<Long>into(new GlobalWindows()).triggering(
            Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
            .discardingFiredPanes())
        .apply(ParDo.of(new DoFn<Long, Map<String, byte[]>>() {
          @ProcessElement
          public void processElement(ProcessContext ctx) throws Exception {
            byte[] avroSchemaBlob = getAvroSchema();
            byte[] fileDescriptorSetBlob = getFileDescriptorSet();
            byte[] depsBlob = getFileDescriptorDeps();
            Map<String, byte[]> dataBlobs = ImmutableMap.of(
                "version", Longs.toByteArray(ctx.element().byteValue()),
                "avroSchemaBlob", avroSchemaBlob,
                "fileDescriptorSetBlob", fileDescriptorSetBlob,
                "depsBlob", depsBlob);
            ctx.output(dataBlobs);
          }
        }))
        .apply(View.asSingleton());

"getAvroSchema", "getFileDescriptorSet" и "getFileDescriptorDeps" читают файлы как байты [] из облачного хранилища.

Однако этот подход не удался из исключения:

org. apache .beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java .lang.IllegalArgumentException: PCollection с более чем одним элементом, доступным как одноэлементное представление.

Затем я попытался написать свою собственную функцию Combine Globally, например:

static class GetLatestVersion implements SerializableFunction<Iterable<Map<String, byte[]>>, Map<String, byte[]>> {
@Override
public Map<String, byte[]> apply(Iterable<Map<String, byte[]>> versions) {
  Map<String, byte[]> result = Maps.newHashMap();
  Long maxVersion = Long.MIN_VALUE;
  for (Map<String, byte[]> version: versions){
    Long currentVersion = Longs.fromByteArray(version.get("version"));
    logger.info("Side input version: " + currentVersion);
    if (currentVersion > maxVersion) {
      result = version;
      maxVersion = currentVersion;
    }
  }
  return result;
}

}

Но это все еще вызывает то же исключение ........

Затем я наткнулся на это и это Архив электронной почты Beam и похоже на то, что предлагается в Beam do c do не работает. И я должен использовать MultiMap, чтобы избежать исключения, с которым я столкнулся выше. С MultiMap мне также придется перебирать значения и иметь свой собственный логин c, чтобы выбрать желаемое значение (самое последнее).

Мои вопросы:

  1. Почему я все еще получаю исключение "PCollection с более чем одним элементом, доступным как одноэлементное представление", даже после того, как я глобально объединяю все в 1 результат?
  2. Если I go с подходом MultiMap, Разве работа не закончится в памяти? Потому что каждый день мы в основном увеличиваем MultiMap на 90 МБ (размер нашего блоба данных), если только у Dataflow нет какой-то умной реализации MultiMap за сценой.
  3. Каков рекомендуемый способ сделать это?

Спасибо

...