У меня есть задание Dataflow, которое считывает данные из pubsub и на основе времени и имени файла записывает содержимое в GCS, где путь к папке основывается на YYYY / MM / DD. Это позволяет создавать файлы в папках на основе даты и использовать Apache Beam FileIO
и Dynamic Destinations
.
Около двух недель назад я заметил необычное накопление неподтвержденных сообщений. После перезапуска задания df ошибки исчезли, и новые файлы записывались в GCS.
Через пару дней запись снова остановилась, за исключением этого времени были ошибки, утверждающие, что обработка зависла. После некоторых достоверных исследований SO я обнаружил, что это, вероятно, вызвано из-за проблемы взаимоблокировки в Beam до 2.90, потому что она использовала библиотеку Conscrypt в качестве поставщика безопасности по умолчанию. Итак, я обновился до Beam 2.11 с Beam 2.8.
Еще раз, это сработало, пока не сработало. Я более внимательно посмотрел на ошибку и заметил, что у нее есть проблема с объектом SimpleDateFormat, который не является потокобезопасным. Итак, я переключился на использование Java.time и DateTimeFormatter, который является потокобезопасным. Это сработало, пока не сработало. Однако на этот раз ошибка немного отличалась и не указывала ни на что в моем коде:
Ошибка приведена ниже.
Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.hasNext(MultitransformedIterator.java:47)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:701)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
Эта ошибка стала возникать примерно через 5 часов после развертывания задания и со временем увеличивается. Письмо значительно замедлилось в течение 24 часов. У меня 60 работников, и я подозреваю, что один работник выходит из строя каждый раз, когда возникает ошибка, которая в итоге убивает работу.
В моем писателе я анализирую строки для определенных ключевых слов (возможно, это не лучший способ), чтобы определить, к какой папке он принадлежит. Затем я продолжаю вставлять файл в GCS с определенным именем файла. Вот код, который я использую для своего писателя:
Функция разделения предоставляется следующим образом:
@SuppressWarnings("serial")
public static class datePartition implements SerializableFunction<String, String> {
private String filename;
public datePartition(String filename) {
this.filename = filename;
}
@Override
public String apply(String input) {
String folder_name = "NaN";
String date_dtf = "NaN";
String date_literal = "NaN";
try {
Matcher foldernames = Pattern.compile("\"foldername\":\"(.*?)\"").matcher(input);
if(foldernames.find()) {
folder_name = foldernames.group(1);
}
else {
Matcher folderid = Pattern.compile("\"folderid\":\"(.*?)\"").matcher(input);
if(folderid.find()) {
folder_name = folderid.group(1);
}
}
Matcher date_long = Pattern.compile("\"timestamp\":\"(.*?)\"").matcher(input);
if(date_long.find()) {
date_literal = date_long.group(1);
if(Utilities.isNumeric(date_literal)) {
LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.valueOf(date_literal)), ZoneId.systemDefault());
date_dtf = date.format(dtf);
}
else {
date_dtf = date_literal.split(":")[0].replace("-", "/").replace("T", "/");
}
}
return folder_name + "/" + date_dtf + "h/" + filename;
}
catch(Exception e) {
LOG.error("ERROR with either foldername or date");
LOG.error("Line : " + input);
LOG.error("folder : " + folder_name);
LOG.error("Date : " + date_dtf);
return folder_name + "/" + date_dtf + "h/" + filename;
}
}
}
А фактическое место, где трубопровод развернут и запущен, можно найти ниже:
public void streamData() {
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(options.getWindowDuration() + " Window",
Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(parseDuration("24h")))
.apply(new GenericFunctions.extractMsg())
.apply(FileIO.<String, String>writeDynamic()
.by(new datePartition(options.getOutputFilenamePrefix()))
.via(TextIO.sink())
.withNumShards(options.getNumShards())
.to(options.getOutputDirectory())
.withNaming(type -> FileIO.Write.defaultNaming(type, ".txt"))
.withDestinationCoder(StringUtf8Coder.of()));
pipeline.run();
}