Увеличение числа рабочих приводит к зависанию задания Dataflow на TextIO.Write - выполняется быстро с использованием DirectRunner - Apache Beam - PullRequest
0 голосов
/ 04 февраля 2020

Эта программа принимает записи из файла, анализирует и сохраняет записи в базе данных и записывает записи об ошибках в корзину Cloud Storage. Используемый мной тестовый файл создает только 3 записи об ошибках - при локальном запуске последний шаг parseResults.get(failedRecords).apply("WriteFailedRecordsToGCS", TextIO.write().to(failureRecordsPath)); выполняется за миллисекунды.

В потоке данных я запускаю процесс с 5 работниками. Процесс бесконечно зависает на этапе записи даже после успешной записи 3 записей об ошибках. Я вижу, что он зависает на этапе WriteFailedRecordsToGCS/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair with random key.out0

Может кто-нибудь дать мне знать, почему это так по-разному ведет себя между DirectRunner и Dataflow? Весь трубопровод ниже.

        StageUtilizationDataSourceOptions options = PipelineOptionsFactory.fromArgs(args).as(StageUtilizationDataSourceOptions.class);
        final TupleTag<Utilization> parsedRecords = new TupleTag<Utilization>("parsedRecords") {};
        final TupleTag<String> failedRecords = new TupleTag<String>("failedRecords") {};
        DrgAnalysisDbStage drgAnalysisDbStage = new DrgAnalysisDbStage(options);
        HashMap<String, Client> clientKeyMap = drgAnalysisDbStage.getClientKeys();

        Pipeline pipeline = Pipeline.create(options);
        PCollectionTuple parseResults = PCollectionTuple.empty(pipeline);

        PCollection<String> records = pipeline.apply("ReadFromGCS", TextIO.read().from(options.getGcsFilePath()));

        if (FileTypes.utilization.equalsIgnoreCase(options.getFileType())) {
             parseResults = records
                    .apply("ConvertToUtilizationRecord", ParDo.of(new ParseUtilizationFile(parsedRecords, failedRecords, clientKeyMap, options.getGcsFilePath()))
                    .withOutputTags(parsedRecords, TupleTagList.of(failedRecords)));
             parseResults.get(parsedRecords).apply("WriteToUtilizationStagingTable", drgAnalysisDbStage.writeUtilizationRecordsToStagingTable());
        } else {
            logger.error("Unrecognized file type provided: " + options.getFileType());
        }

        String failureRecordsPath = Utilities.getFailureRecordsPath(options.getGcsFilePath(), options.getFileType());
        parseResults.get(failedRecords).apply("WriteFailedRecordsToGCS", TextIO.write().to(failureRecordsPath));

        pipeline.run().waitUntilFinish();
...