Прерывистый Исключение Файловая система не найдена - PullRequest
0 голосов
/ 24 октября 2019

Мы выполняем пакетное задание на flink, которое считывает данные из GCS и выполняет некоторую агрегацию этих данных. Мы периодически получаем проблему: No filesystem found for scheme gs

Мы работаем с Beam версии 2.15.0 с FlinkRunner, версия Flink: 1.6.4

При удаленной отладке диспетчеров задач мы обнаружили, что внемногие диспетчеры задач GcsFileSystemRegistrar не добавляются в список схем файловой системы. В этих менеджерах задач мы получаем эту проблему.

Коллекция SCHEME_TO_FILESYSTEM модифицируется только в вызове функции setDefaultPipelineOptions в классе org.apache.beam.sdk.io.FileSystems, и эта функция не вызывается, и, следовательно,GcsFileSystemRegistrar не добавляется в SCHEME_TO_FILESYSTEM.

Подробная трассировка стека:

java.lang.IllegalArgumentException: No filesystem found for scheme gs
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
    at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
    at org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
    at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
    at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
    at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

Чтобы решить эту проблему, мы попытались вызвать следующую функцию в функции расширения PTransform: FileSystems.setDefaultPipelineOptions (PipelineOptionsFactory):);Этот вызов функции предназначен для того, чтобы убедиться, что GcsFileSystemRegistrar добавлен в список, но это не решило проблему.

Может кто-нибудь помочь в проверке, почему это может происходить и что можно сделать, чтобы решить эту проблему? выпуск.

...