Я использую облачный поток данных Google, чтобы выполнить какое-либо преобразование
Я перебираю около 3 миллионов записей из GBQ и выполняю преобразование и записываю результат преобразования в GCS.
При выполнении этой операции поток данных не выполняется.с ошибкой Ошибка: Отключение JVM после 8 последовательных периодов измерения ГХ
Рабочий процесс не выполнен.Причины: S20: Чтение GBQ / Reshuffle.ViaRandomKey / Reshuffle / GroupByKey / Чтение + Чтение GBQ / Reshuffle.ViaRandomKey / Reshuffle / GroupByKey / GroupByWindow + Чтение GBQ / Reshuffle.ViaRandomKey / Reshuffle / ResueRueRueVueRueRueVueRueRueRueRueRueRueRueRueRueЗначения / Карта + Чтение GBQ / Чтение файлов + Чтение GBQ / PassThroughThenCleanup / ParMultiDo (Identity) + Чтение GBQ / PassThroughThenCleanup / View.AsIterable / ParDo (ToIsmRecordForGlobalWindow) + преобразование + разделение результатов / ParMultiDo (Partition) + WriteBlobalFoneBoInFloboFlindGoLoveLogFliveGoToLESFLOWLOWLOGLOWLOLLEWLOGWindow.Assign + Ошибки записи / WriteFiles / WriteShardedBundlesToTempFiles / ApplyShardingKey + Ошибки записи / WriteFiles / WriteShardedBundlesToTempFiles / GroupIntoShards / Reify + Записи ошибок / WriteFiles / WriteShardedBundlesToTempiles/ WriteShardedBundlesToTempFiles / GroupIntoShards / Reify + Запись сущностей Gzip / WriteFiles / WriteShardedBundlesToTempFiles / GroupIntoShards / Запись не выполнена. Рабочий элемент был4 раза безуспешно.Каждый раз работник со временем терял связь со службой.Попытка рабочего элемента:
Параметры DataConverterOptions = PipelineOptionsFactory.fromArgs (args) .withValidation () .as (DataConverterOptions.class);Pipeline p = Pipeline.create (опции);
EntityCreatorFn entityCreatorFn = EntityCreatorFn.newWithGCSMapping(options.getMapping(),
options.getWithUri(), options.getLineNumberToResult(), options.getIsPartialUpdate(), options.getQuery() != null);
PCollectionList<String> resultByType =
p.apply("Read GBQ", BigQueryIO.read(
(SchemaAndRecord elem) -> elem.getRecord().get("lineNumber") + "|" + elem.getRecord().get("sourceData"))
.fromQuery(options.getQuery()).withoutValidation()
.withCoder(StringUtf8Coder.of()).withTemplateCompatibility()).apply("transform",ParDo.of(entityCreatorFn))
.apply("Split results",Partition.of(2, (Partition.PartitionFn<String>) (elem, numPartitions) -> {
if (elem.startsWith(PREFIX_ERROR)) {
return PARTITION_ERROR;
}
return PARTITION_SUCCESS;
}));
FileIO.Sink sink = TextIO.sink();
resultByType.get(0).apply("Write entities Gzip", FileIO.write().to(options.getOutput()).withCompression(Compression.GZIP).withNumShards(options.getShards()).via(sink));
resultByType.get(1).apply("Write errors", TextIO.write().to(options.getErrorOutput()).withoutSharding());
p.run();
Отключение JVM после 8 последовательных периодов измерения ГХ.Память используется / всего / max = 109/301/2507 МБ, GC last / max = 54,00 / 54,00%, # откатов = 0, gc thrashing = true.