Эта программа принимает записи из файла, анализирует и сохраняет записи в базе данных и записывает записи об ошибках в корзину Cloud Storage. Используемый мной тестовый файл создает только 3 записи об ошибках - при локальном запуске последний шаг parseResults.get(failedRecords).apply("WriteFailedRecordsToGCS", TextIO.write().to(failureRecordsPath));
выполняется за миллисекунды.
В потоке данных я запускаю процесс с 5 работниками. Процесс бесконечно зависает на этапе записи даже после успешной записи 3 записей об ошибках. Я вижу, что он зависает на этапе WriteFailedRecordsToGCS/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair with random key.out0
Может кто-нибудь дать мне знать, почему это так по-разному ведет себя между DirectRunner и Dataflow? Весь трубопровод ниже.
StageUtilizationDataSourceOptions options = PipelineOptionsFactory.fromArgs(args).as(StageUtilizationDataSourceOptions.class);
final TupleTag<Utilization> parsedRecords = new TupleTag<Utilization>("parsedRecords") {};
final TupleTag<String> failedRecords = new TupleTag<String>("failedRecords") {};
DrgAnalysisDbStage drgAnalysisDbStage = new DrgAnalysisDbStage(options);
HashMap<String, Client> clientKeyMap = drgAnalysisDbStage.getClientKeys();
Pipeline pipeline = Pipeline.create(options);
PCollectionTuple parseResults = PCollectionTuple.empty(pipeline);
PCollection<String> records = pipeline.apply("ReadFromGCS", TextIO.read().from(options.getGcsFilePath()));
if (FileTypes.utilization.equalsIgnoreCase(options.getFileType())) {
parseResults = records
.apply("ConvertToUtilizationRecord", ParDo.of(new ParseUtilizationFile(parsedRecords, failedRecords, clientKeyMap, options.getGcsFilePath()))
.withOutputTags(parsedRecords, TupleTagList.of(failedRecords)));
parseResults.get(parsedRecords).apply("WriteToUtilizationStagingTable", drgAnalysisDbStage.writeUtilizationRecordsToStagingTable());
} else {
logger.error("Unrecognized file type provided: " + options.getFileType());
}
String failureRecordsPath = Utilities.getFailureRecordsPath(options.getGcsFilePath(), options.getFileType());
parseResults.get(failedRecords).apply("WriteFailedRecordsToGCS", TextIO.write().to(failureRecordsPath));
pipeline.run().waitUntilFinish();