Apache Beam TextIO не работает с Spark Runner - PullRequest
3 голосов
/ 23 мая 2019

Я пытаюсь запустить свой код Beam на Spark для POC.Я запускаю приложение в Google Cloud Dataproc для тестирования.Это очень простой тест для чтения из темы PubSub и записи сообщения в корзину в Google Cloud Storage.Кластер Dataproc имеет правильную версию для Spark и имеет доступ к другим API GCP.

Я также пытался использовать FileIO, но это тоже не сработало.Я попытался опубликовать в другой теме PubSub вместо того, чтобы писать, и это сработало, но это не мой вариант использования.Я пробовал печатать перед записью в TextIO, и это подтвердило, что я могу читать сообщения из PubSub.

Вот конвейер:

PCollection<String> messages = pipeline
    .apply(PubsubIO.readStrings().fromSubscription(sub))
    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))));
 messages.apply(TextIO.write().to("gs://...").withNumShards(1).withWindowedWrites());

pipeline.run();

Я не вижу никаких журналов при выводе задания Dataproc.Никаких ошибок или вообще ничего.В ведре также нет файла.

1 Ответ

1 голос
/ 24 мая 2019

Я обнаружил, что это проблема запуска. Вот подробное обсуждение:
https://lists.apache.org/thread.html/a831da3cd74159bf0e0f3fe77363b022cde943ba40c6ab68bb33d5bb@%3Cuser.beam.apache.org%3E

Я исправил это, изменив свое оконное преобразование на триггер раннего запуска:

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
           .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                                     .alignedTo(Duration.standardSeconds(10))))
                .withAllowedLateness(Duration.standardSeconds(10))
                .discardingFiredPanes())
...