Переместите файлы в другую папку GCS и выполните действия после выполнения конвейера луча apache. - PullRequest
0 голосов
/ 20 сентября 2019

Я создал потоковый конвейер луча Apache, который считывает файлы из папок GCS и вставляет их в BigQuery, он отлично работает, но он перерабатывает все файлы, когда я останавливаю и запускаю задание, поэтому все данные будут реплицированы снова.

Так что моя идея состоит в том, чтобы переместить файлы из отсканированного каталога в другой, но я не знаю, как технически это сделать с помощью Apache Beam.

Спасибо


public static PipelineResult run(Options options) {
// Create the pipeline.

        Pipeline pipeline = Pipeline.create(options);

        /*
         * Steps:
         *  1) Read from the text source.
         *  2) Write each text record to Pub/Sub
         */

        LOG.info("Running pipeline");
        LOG.info("Input : " + options.getInputFilePattern());
        LOG.info("Output : " + options.getOutputTopic());

        PCollection<String> collection = pipeline
                .apply("Read Text Data", TextIO.read()
                        .from(options.getInputFilePattern())
                        .watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never()))

                .apply("Write logs", ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception {
                        LOG.info(c.element());
                        c.output(c.element());
                    }
                }));

        collection.apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));

        return pipeline.run();
    }

1 Ответ

0 голосов
/ 26 сентября 2019

Несколько советов:

  • От вас обычно не ожидают остановки и перезапуска потокового конвейера.Потоковые конвейеры предназначены для вечной работы и иногда обновляются, если вы хотите внести изменения в логику.
  • Тем не менее, можно использовать FileIO для сопоставления нескольких файлов и перемещать их после того, как онибыли обработаны.

Вы бы написали класс DoFn примерно так: ReadWholeFileThenMoveToAnotherBucketDoFn, который будет читать весь файл, , а затем переместить его в новое ведро.

    Pipeline pipeline = Pipeline.create(options);


    PCollection<FileIO.Match> matches = pipeline
            .apply("Read Text Data", FileIO.match()
                    .filepattern(options.getInputFilePattern())
                    .continuously(Duration.standardSeconds(60), 
                                  Watch.Growth.<String>never()));

    matches.apply(FileIO.readMatches())
           .apply(ParDo.of(new ReadWholeFileThenMoveToAnotherBucketDoFn()))
            .apply("Write logs", ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    LOG.info(c.element());
                    c.output(c.element());
                }
            }));

    ....
...