Я хотел использовать Облачное хранилище Google для записи ( сток ) элементов DataStream
из моей потоковой работы с использованием StreamingFileSink
.
Для этого я использовал Соединитель облачного хранилища Google для Hadoop в качестве реализации org.apache.hadoop.fs.FileSystem
и использовал HadoopFileSystem
в качестве реализации org.apache.flink.core.fs.FileSystem
, которая переносит hadoop FileSystem класс для Flink.
Я включил следующие зависимости в свой файл Gradle:
compile(
"com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2"
)
compile(
"org.apache.flink:flink-connector-filesystem_2.11:1.6.0"
)
provided(
"org.apache.flink:flink-shaded-hadoop2:1.6.0"
)
Теперь, насколько я понимаю, глядя на источники [1] [2] [ 3] , Flink динамически загружает реализации FileSystemFactory
во время выполнения (через java.util.ServiceLoader
), а также загружает HadoopFsFactory
во время выполнения (через отражение , если он находит Hadoop в пути к классам), которое затем используется для создания экземпляров FileSystem
.
Проблема, с которой я столкнулся, заключалась в том, что по умолчанию RecoverableWriter
для пакета совместимости Hadoop поддерживает только файловую схему hdfs
(я использую gs
) и, следовательно, выдает ошибку во время выполнения .
Итак, я extended
HadoopFileSystem
(я звонил GCSFileSystem
) и @overrided
FileSystem#createRecoverableWriter()
, чтобы вернуть пользовательскую реализацию RecoverableWriter
, которая затем обработайте детали восстановления и т. д., а также создайте соответствующий класс FileSystemFactory
(класс отмечен @AutoService
и, следовательно, должен быть обнаружен ServiceLoader
).
Настройка хорошо работает локально и в локальном докерном кластере (фактически, соединитель GCS выдает ошибку из-за отсутствия авторизации, но это нормально, так как это означает, что FileSystem
загружен и работает), но происходит сбой при развертывании это в докер кластер, работающий на Google Compute Engine.
В GCE загружается значение по умолчанию HadoopFileSystem
и выдается исключение, поскольку схема имеет вид gs
, а не hdfs
, но я предполагаю, что он должен был загрузить мою реализацию фабрики, и поэтому эта ошибка не должна не возникло.
Я нахожусь на Flink v1.6.0 и работаю как долго работающий кластер сеанса на Docker, используя docker-flink