Чтение нескольких файлов во время выполнения (шаблон потока данных) - PullRequest
0 голосов
/ 05 ноября 2018

Я пытаюсь создать шаблон потока данных.

Цель - прочитать ValueProvider, который скажет мне, какие файлы читать. Затем для каждого файла мне нужно прочитать и дополнить данные объектом. Я пробовал это:

        p.apply(Create.of(options.getScheduleBatch()))
            .apply(ParDo.of(StringScheduleBatchToFileReceivedFn.of()))
            .apply(ParDo.of(new DoFn<FileReceived, PCollection<EventRow>>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    FileReceived fileReceived = c.element();
                    Broker broker = configuration.getBroker(fileReceived.getBrokerId());
                    PCollection<EventRow> eventRows = p
                            .apply(TextIO.read().from(fileReceived.getUri()))
                            .apply(ParDo.of(StringToEventRowFn.of(broker, fileReceived, options.getJobName())));
                    c.output(eventRows);
                }
            }));

Но у меня есть следующая ошибка:

Не удалось получить кодировщик из CoderRegistry: невозможно предоставить кодировщик для org.apache.beam.sdk.values.PCollection.

Я бы хотел найти лучший способ, чем сам читать файл с помощью клиента gcs.

У вас есть какие-нибудь советы?

С наилучшими пожеланиями

1 Ответ

0 голосов
/ 08 ноября 2018

Проблема:

Вы пытаетесь испустить PCollection как вывод вашего ParDo. Это не работает.

подробности:

PCollection - это абстракция, представляющая потенциально неограниченный набор элементов. Применение преобразования к PCollection дает вам еще один PCollection. Одно из преобразований, которое вы можете применить - это ParDo. ParDos делать поэлементные преобразования. Применяя ParDo, вы выражаете: «Возьми это PCollection и сделай еще один, преобразовав все элементы в нем, применив это ParDo».

Одной из вещей, которая делает обработку эффективной, является способность выполнять все параллельно, например преобразование множества элементов одновременно на нескольких исполнительных узлах (например, виртуальных машинах / машинах) путем запуска одинакового ParDo на каждом из разных элементов. И вы не можете явно контролировать, произойдет ли какое-либо конкретное преобразование на том же самом исполнительном или другом узле, это является частью базовой системы, как оптимизировать это. Но чтобы включить это, вы должны иметь возможность передавать элементы между узлами выполнения и сохранять их для агрегирования. Beam поддерживает это, требуя от вас Coders для элементов. Кодеры - это механизм сериализации, который позволяет Beam преобразовывать элемент (представленный объектом Java) в байтовый массив, который затем может быть передан следующему преобразованию (которое потенциально может произойти на другом компьютере) или в хранилище. Например, Beam должен иметь возможность кодировать элементы, которые вы выводите из ParDo. Beam знает, как сериализовать некоторые типы, но он не может определить все автоматически, вы должны предоставить кодировщики для чего-то, что не может быть выведено.

Ваш пример выглядит следующим образом: возьмите немного PCollection и преобразуйте его в другой PCollection, применив ParDo к каждому элементу, и это ParDo преобразует каждый элемент ввода в PCollection. Это означает, что как только элемент обрабатывается ParDo, вы должны его кодировать и передать следующему преобразованию. И здесь возникает вопрос - как вы кодируете и передаете (потенциально неограниченный) PCollection следующему преобразованию или сохраняете его для агрегирования?

Beam не поддерживает это в данный момент, поэтому вам нужно будет выбрать другой подход.

В вашем конкретном случае я не уверен, что в Beam из коробки вы можете просто использовать поток имен файлов и конвертировать их в суб-конвейеры для обработки строк в файлах.

Обходные:

Несколько подходов, которые я могу придумать, чтобы обойти это ограничение:

  • Если имена ваших файлов имеют известный шаблон, вы можете указать шаблон в TextIO, и он сможет читать новые файлы по мере их поступления.

  • Если у них нет известного шаблона, вы можете написать другой конвейер, чтобы переименовать имена файлов, чтобы они имели общий шаблон имен, а затем использовать шаблон в TextIO в другом конвейере.

  • Если это возможно (например, файлы помещаются в память), вы, вероятно, можете прочитать содержимое файлов с помощью чистого API java File, разбить их на строки и выдать эти строки за один ParDo. Затем вы можете применить тот же StringToEventRowFn в следующем ParDo.

Надеюсь, это поможет

...