Apache Beam не сохраняет неограниченные данные в текстовый файл - PullRequest
0 голосов
/ 21 сентября 2018

Я создал конвейер для сохранения сообщений Google Cloud Pubsub в текстовые файлы с использованием Apache Beam и Java.Всякий раз, когда я запускаю конвейер в потоке данных Google с --runner=DataflowRunner, сообщения сохраняются правильно.

Однако, когда я запускаю тот же конвейер с --runner=DirerctRunner, сообщения не сохраняются.

Я могунаблюдайте за событиями, идущими через конвейер, но ничего не происходит.

Конвейер - это код ниже:

public static void main(String[] args) {
    ExerciseOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ExerciseOptions.class);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
      .apply("Read Messages from Pubsub",
        PubsubIO
          .readStrings()
          .fromTopic(options.getTopicName()))

      .apply("Set event timestamp", ParDo.of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(ProcessContext context) {
          context.outputWithTimestamp(context.element(), Instant.now());
        }
      }))

      .apply("Windowing", Window.into(FixedWindows.of(Duration.standardMinutes(5))))

      .apply("Write to File",
        TextIO
          .write()
          .withWindowedWrites()
          .withNumShards(1)
          .to(options.getOutputPrefix()));

    pipeline.run();
  }

Что я делаю не так?Можно ли запустить этот трубопровод локально?

1 Ответ

0 голосов
/ 05 октября 2018

Я столкнулся с той же проблемой, что и вы, во время тестирования конвейера.PubSubIO не работает правильно с DirectRunner и TextIO.

Я нашел какой-то обходной путь для этой проблемы с триггером.

.apply(
                    "2 minutes window",
                    Window
                            .configure()
                            .triggering(
                                    Repeatedly.forever(
                                            AfterFirst.of(
                                                AfterPane.elementCountAtLeast(10),
                                                AfterProcessingTime
                                                        .pastFirstElementInPane()
                                                        .plusDelayOf(Duration.standardMinutes(2))
                                            )
                                    )
                            )
                            .into(
                                FixedWindows.of(
                                        Duration.standardMinutes(2)
                                )
                            )
            )

Таким образом, файлы записываются так, как должны.Надеюсь, это кому-нибудь поможет.

...