Контекст: У нас есть несколько файлов схемы в облачном хранилище. В нашей работе Dataflow нам нужно обращаться к этим файлам схемы для преобразования наших данных. Эти файлы схемы меняются ежедневно / еженедельно. Наш источник данных - PubSub, и мы помещаем сообщения PubSub в фиксированное окно продолжительностью 1 минута. Файлы схемы, которые нам нужны, хорошо вписываются в память, их размер составляет около 90 МБ.
Что я пробовал: Ссылаясь на , это делает c с Apache Луч, мы создали боковой ввод, который записывает в глобальное окно с GenerateSequence, например так:
// Creates a side input that refreshes the schema every minute
PCollectionView<Map<String, byte[]>> dataBlobView =
pipeline.apply(GenerateSequence.from(0).withRate(1, Duration.standardDays(1L)))
.apply(Window.<Long>into(new GlobalWindows()).triggering(
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(ParDo.of(new DoFn<Long, Map<String, byte[]>>() {
@ProcessElement
public void processElement(ProcessContext ctx) throws Exception {
byte[] avroSchemaBlob = getAvroSchema();
byte[] fileDescriptorSetBlob = getFileDescriptorSet();
byte[] depsBlob = getFileDescriptorDeps();
Map<String, byte[]> dataBlobs = ImmutableMap.of(
"version", Longs.toByteArray(ctx.element().byteValue()),
"avroSchemaBlob", avroSchemaBlob,
"fileDescriptorSetBlob", fileDescriptorSetBlob,
"depsBlob", depsBlob);
ctx.output(dataBlobs);
}
}))
.apply(View.asSingleton());
"getAvroSchema", "getFileDescriptorSet" и "getFileDescriptorDeps" читают файлы как байты [] из облачного хранилища.
Однако этот подход не удался из исключения:
org. apache .beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java .lang.IllegalArgumentException: PCollection с более чем одним элементом, доступным как одноэлементное представление.
Затем я попытался написать свою собственную функцию Combine Globally, например:
static class GetLatestVersion implements SerializableFunction<Iterable<Map<String, byte[]>>, Map<String, byte[]>> {
@Override
public Map<String, byte[]> apply(Iterable<Map<String, byte[]>> versions) {
Map<String, byte[]> result = Maps.newHashMap();
Long maxVersion = Long.MIN_VALUE;
for (Map<String, byte[]> version: versions){
Long currentVersion = Longs.fromByteArray(version.get("version"));
logger.info("Side input version: " + currentVersion);
if (currentVersion > maxVersion) {
result = version;
maxVersion = currentVersion;
}
}
return result;
}
}
Но это все еще вызывает то же исключение ........
Затем я наткнулся на это и это Архив электронной почты Beam и похоже на то, что предлагается в Beam do c do не работает. И я должен использовать MultiMap, чтобы избежать исключения, с которым я столкнулся выше. С MultiMap мне также придется перебирать значения и иметь свой собственный логин c, чтобы выбрать желаемое значение (самое последнее).
Мои вопросы:
- Почему я все еще получаю исключение "PCollection с более чем одним элементом, доступным как одноэлементное представление", даже после того, как я глобально объединяю все в 1 результат?
- Если I go с подходом MultiMap, Разве работа не закончится в памяти? Потому что каждый день мы в основном увеличиваем MultiMap на 90 МБ (размер нашего блоба данных), если только у Dataflow нет какой-то умной реализации MultiMap за сценой.
- Каков рекомендуемый способ сделать это?
Спасибо