Облачный поток данных Google: отключение JVM после 8 последовательных периодов измерения ГХ - PullRequest
0 голосов
/ 03 апреля 2019

Я использую облачный поток данных Google, чтобы выполнить какое-либо преобразование

Я перебираю около 3 миллионов записей из GBQ и выполняю преобразование и записываю результат преобразования в GCS.

При выполнении этой операции поток данных не выполняется.с ошибкой Ошибка: Отключение JVM после 8 последовательных периодов измерения ГХ

Рабочий процесс не выполнен.Причины: S20: Чтение GBQ / Reshuffle.ViaRandomKey / Reshuffle / GroupByKey / Чтение + Чтение GBQ / Reshuffle.ViaRandomKey / Reshuffle / GroupByKey / GroupByWindow + Чтение GBQ / Reshuffle.ViaRandomKey / Reshuffle / ResueRueRueVueRueRueVueRueRueRueRueRueRueRueRueRueЗначения / Карта + Чтение GBQ / Чтение файлов + Чтение GBQ / PassThroughThenCleanup / ParMultiDo (Identity) + Чтение GBQ / PassThroughThenCleanup / View.AsIterable / ParDo (ToIsmRecordForGlobalWindow) + преобразование + разделение результатов / ParMultiDo (Partition) + WriteBlobalFoneBoInFloboFlindGoLoveLogFliveGoToLESFLOWLOWLOGLOWLOLLEWLOGWindow.Assign + Ошибки записи / WriteFiles / WriteShardedBundlesToTempFiles / ApplyShardingKey + Ошибки записи / WriteFiles / WriteShardedBundlesToTempFiles / GroupIntoShards / Reify + Записи ошибок / WriteFiles / WriteShardedBundlesToTempiles/ WriteShardedBundlesToTempFiles / GroupIntoShards / Reify + Запись сущностей Gzip / WriteFiles / WriteShardedBundlesToTempFiles / GroupIntoShards / Запись не выполнена. Рабочий элемент был4 раза безуспешно.Каждый раз работник со временем терял связь со службой.Попытка рабочего элемента:

Параметры DataConverterOptions = PipelineOptionsFactory.fromArgs (args) .withValidation () .as (DataConverterOptions.class);Pipeline p = Pipeline.create (опции);

    EntityCreatorFn entityCreatorFn = EntityCreatorFn.newWithGCSMapping(options.getMapping(),
            options.getWithUri(), options.getLineNumberToResult(), options.getIsPartialUpdate(), options.getQuery() != null);
    PCollectionList<String> resultByType =
            p.apply("Read GBQ", BigQueryIO.read(
                    (SchemaAndRecord elem) -> elem.getRecord().get("lineNumber") + "|" + elem.getRecord().get("sourceData"))
                    .fromQuery(options.getQuery()).withoutValidation()
                    .withCoder(StringUtf8Coder.of()).withTemplateCompatibility()).apply("transform",ParDo.of(entityCreatorFn))
                    .apply("Split results",Partition.of(2, (Partition.PartitionFn<String>) (elem, numPartitions) -> {
                        if (elem.startsWith(PREFIX_ERROR)) {
                            return PARTITION_ERROR;
                        }
                        return PARTITION_SUCCESS;
                    }));
    FileIO.Sink sink = TextIO.sink();
    resultByType.get(0).apply("Write entities Gzip", FileIO.write().to(options.getOutput()).withCompression(Compression.GZIP).withNumShards(options.getShards()).via(sink));
    resultByType.get(1).apply("Write errors", TextIO.write().to(options.getErrorOutput()).withoutSharding());
    p.run();

Отключение JVM после 8 последовательных периодов измерения ГХ.Память используется / всего / max = 109/301/2507 МБ, GC last / max = 54,00 / 54,00%, # откатов = 0, gc thrashing = true.

1 Ответ

0 голосов
/ 04 апреля 2019

Может ли EntityCreatorFn.newWithGCSMapping кэшировать элементы в памяти случайно? Похоже, что один из шагов в вашем конвейере потребляет слишком много памяти (обратите внимание, что Dataflow не может распараллелить обработку одного элемента DoFn). Я предлагаю настроить ваш конвейер или опробовать машины с высокими ставками. Если проблема не устранена, обратитесь в Службу поддержки Google Cloud с соответствующими идентификаторами вакансий и т. Д.

...