Проблемы с динамическими назначениями в потоке данных - PullRequest
5 голосов
/ 18 апреля 2019

У меня есть задание Dataflow, которое считывает данные из pubsub и на основе времени и имени файла записывает содержимое в GCS, где путь к папке основывается на YYYY / MM / DD. Это позволяет создавать файлы в папках на основе даты и использовать Apache Beam FileIO и Dynamic Destinations.

Около двух недель назад я заметил необычное накопление неподтвержденных сообщений. После перезапуска задания df ошибки исчезли, и новые файлы записывались в GCS.

Через пару дней запись снова остановилась, за исключением этого времени были ошибки, утверждающие, что обработка зависла. После некоторых достоверных исследований SO я обнаружил, что это, вероятно, вызвано из-за проблемы взаимоблокировки в Beam до 2.90, потому что она использовала библиотеку Conscrypt в качестве поставщика безопасности по умолчанию. Итак, я обновился до Beam 2.11 с Beam 2.8.

Еще раз, это сработало, пока не сработало. Я более внимательно посмотрел на ошибку и заметил, что у нее есть проблема с объектом SimpleDateFormat, который не является потокобезопасным. Итак, я переключился на использование Java.time и DateTimeFormatter, который является потокобезопасным. Это сработало, пока не сработало. Однако на этот раз ошибка немного отличалась и не указывала ни на что в моем коде: Ошибка приведена ниже.

Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
  at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.hasNext(MultitransformedIterator.java:47)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:701)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)

Эта ошибка стала возникать примерно через 5 часов после развертывания задания и со временем увеличивается. Письмо значительно замедлилось в течение 24 часов. У меня 60 работников, и я подозреваю, что один работник выходит из строя каждый раз, когда возникает ошибка, которая в итоге убивает работу.

В моем писателе я анализирую строки для определенных ключевых слов (возможно, это не лучший способ), чтобы определить, к какой папке он принадлежит. Затем я продолжаю вставлять файл в GCS с определенным именем файла. Вот код, который я использую для своего писателя:

Функция разделения предоставляется следующим образом:

@SuppressWarnings("serial")
public static class datePartition implements SerializableFunction<String, String> {     
    private String filename;

    public datePartition(String filename) {
        this.filename = filename;
    }

    @Override
    public String apply(String input) {

        String folder_name = "NaN";             
        String date_dtf    = "NaN";     
        String date_literal = "NaN";
        try {
            Matcher foldernames = Pattern.compile("\"foldername\":\"(.*?)\"").matcher(input);
            if(foldernames.find()) {
                folder_name = foldernames.group(1);
            }
            else {
                Matcher folderid = Pattern.compile("\"folderid\":\"(.*?)\"").matcher(input);
                if(folderid.find()) {
                    folder_name = folderid.group(1);
                }   
            }

            Matcher date_long = Pattern.compile("\"timestamp\":\"(.*?)\"").matcher(input);
            if(date_long.find()) {
                date_literal = date_long.group(1);
                if(Utilities.isNumeric(date_literal)) {
                    LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.valueOf(date_literal)), ZoneId.systemDefault());
                    date_dtf = date.format(dtf);                        
                }
                else {
                    date_dtf = date_literal.split(":")[0].replace("-", "/").replace("T", "/");
                }
            }
            return folder_name + "/" + date_dtf + "h/" + filename;
        }

        catch(Exception e) {
            LOG.error("ERROR with either foldername or date");
            LOG.error("Line : " + input);
            LOG.error("folder : " + folder_name);
            LOG.error("Date : " + date_dtf);

            return folder_name + "/" + date_dtf + "h/" + filename;
        }           
    }
}

А фактическое место, где трубопровод развернут и запущен, можно найти ниже:

public void streamData() {

    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
            .apply(options.getWindowDuration() + " Window",
                        Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
                                  .triggering(AfterWatermark.pastEndOfWindow()) 
                                  .discardingFiredPanes()
                                  .withAllowedLateness(parseDuration("24h")))
                .apply(new GenericFunctions.extractMsg())
                .apply(FileIO.<String, String>writeDynamic()
                                 .by(new datePartition(options.getOutputFilenamePrefix()))
                                 .via(TextIO.sink())
                                 .withNumShards(options.getNumShards())
                                 .to(options.getOutputDirectory())
                                 .withNaming(type -> FileIO.Write.defaultNaming(type, ".txt"))
                                 .withDestinationCoder(StringUtf8Coder.of()));

    pipeline.run();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...