Flink StreamingFileSink на Azure хранилище BLOB-объектов - PullRequest
2 голосов
/ 16 апреля 2020

Я пытаюсь подключить StreamingFileSink к Azure Blob Storage. В настоящее время нет никаких упоминаний о Azure в документации , но я надеялся, что она будет работать с абстракцией файловой системы.

После анализа ошибки я предполагаю, что эта функция выходит за рамки прямо сейчас для Azure хранилища BLOB-объектов.

Теперь я хотел бы быть уверен, что я не допустил ошибок и был бы признателен за любые указатели, если есть способ заставить его работать.

Пока я узнал:

Это исключение, которое я вижу:

java.lang.NoClassDefFoundError: org/apache/flink/fs/azure/common/hadoop/HadoopRecoverableWriter                                                            
    at org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)                                         
    at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:129)              
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)                                     
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:117)                                                           
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:288)               
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:402)                              
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)                                
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)                              
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)                               
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)                                    
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)                                                    
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)                                                      
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)                                                               
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)                                                                     
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)                                                                                      
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)                                                                                        
    at java.lang.Thread.run(Thread.java:748)                                                                                                               

Теперь, похоже, отсутствует метод класса HadoopRecoverableWriter

Но в репозитории flink- azure -fs-had oop я могу найти это в pom. xml

<!-- shade Flink's Hadoop FS adapter classes  -->
<relocation>
    <pattern>org.apache.flink.runtime.fs.hdfs</pattern>
    <shadedPattern>org.apache.flink.fs.azure.common.hadoop</shadedPattern>
</relocation>

И затененный пакет включает этот класс здесь .

Теперь я посмотрел содержимое flink- azure -fs-had oop -1.10.0.jar и затененного пакета, а RecoverableWriter отсутствует :

HadoopBlockLocation.class
HadoopDataInputStream.class
HadoopDataOutputStream.class
HadoopFileStatus.class
HadoopFileSystem.class
HadoopFsFactory.class
HadoopFsRecoverable.class

Больше копаний показывает, что на самом деле в помпе есть фильтр. xml часть тени, которая исключает RecoverableWriter.

<filter>
    <artifact>org.apache.flink:flink-hadoop-fs</artifact>
    <excludes>
        <exclude>org/apache/flink/runtime/util/HadoopUtils</exclude>
        <exclude>org/apache/flink/runtime/fs/hdfs/HadoopRecoverable*</exclude>
    </excludes>
</filter>
...