Мы выполняем пакетное задание на flink, которое считывает данные из GCS и выполняет некоторую агрегацию этих данных. Мы периодически получаем проблему: No filesystem found for scheme gs
Мы работаем с Beam версии 2.15.0 с FlinkRunner, версия Flink: 1.6.4
При удаленной отладке диспетчеров задач мы обнаружили, что внемногие диспетчеры задач GcsFileSystemRegistrar не добавляются в список схем файловой системы. В этих менеджерах задач мы получаем эту проблему.
Коллекция SCHEME_TO_FILESYSTEM модифицируется только в вызове функции setDefaultPipelineOptions в классе org.apache.beam.sdk.io.FileSystems, и эта функция не вызывается, и, следовательно,GcsFileSystemRegistrar не добавляется в SCHEME_TO_FILESYSTEM.
Подробная трассировка стека:
java.lang.IllegalArgumentException: No filesystem found for scheme gs
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
at org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Чтобы решить эту проблему, мы попытались вызвать следующую функцию в функции расширения PTransform: FileSystems.setDefaultPipelineOptions (PipelineOptionsFactory):);Этот вызов функции предназначен для того, чтобы убедиться, что GcsFileSystemRegistrar добавлен в список, но это не решило проблему.
Может кто-нибудь помочь в проверке, почему это может происходить и что можно сделать, чтобы решить эту проблему? выпуск.