Как создать RecoverableWriter во Flink для Google Cloud Storage - PullRequest
0 голосов
/ 16 сентября 2018

Я хотел использовать Облачное хранилище Google для записи ( сток ) элементов DataStream из моей потоковой работы с использованием StreamingFileSink.

Для этого я использовал Соединитель облачного хранилища Google для Hadoop в качестве реализации org.apache.hadoop.fs.FileSystem и использовал HadoopFileSystem в качестве реализации org.apache.flink.core.fs.FileSystem, которая переносит hadoop FileSystem класс для Flink.

Я включил следующие зависимости в свой файл Gradle:

  • compile("com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2")
  • compile("org.apache.flink:flink-connector-filesystem_2.11:1.6.0")
  • provided("org.apache.flink:flink-shaded-hadoop2:1.6.0")

Теперь, насколько я понимаю, глядя на источники [1] [2] [ 3] , Flink динамически загружает реализации FileSystemFactory во время выполнения (через java.util.ServiceLoader), а также загружает HadoopFsFactory во время выполнения (через отражение , если он находит Hadoop в пути к классам), которое затем используется для создания экземпляров FileSystem.

Проблема, с которой я столкнулся, заключалась в том, что по умолчанию RecoverableWriter для пакета совместимости Hadoop поддерживает только файловую схему hdfs (я использую gs) и, следовательно, выдает ошибку во время выполнения .

Итак, я extended HadoopFileSystem (я звонил GCSFileSystem) и @overrided FileSystem#createRecoverableWriter(), чтобы вернуть пользовательскую реализацию RecoverableWriter, которая затем обработайте детали восстановления и т. д., а также создайте соответствующий класс FileSystemFactory (класс отмечен @AutoService и, следовательно, должен быть обнаружен ServiceLoader).

Настройка хорошо работает локально и в локальном докерном кластере (фактически, соединитель GCS выдает ошибку из-за отсутствия авторизации, но это нормально, так как это означает, что FileSystem загружен и работает), но происходит сбой при развертывании это в докер кластер, работающий на Google Compute Engine.

В GCE загружается значение по умолчанию HadoopFileSystem и выдается исключение, поскольку схема имеет вид gs, а не hdfs, но я предполагаю, что он должен был загрузить мою реализацию фабрики, и поэтому эта ошибка не должна не возникло.

Я нахожусь на Flink v1.6.0 и работаю как долго работающий кластер сеанса на Docker, используя docker-flink

1 Ответ

0 голосов
/ 22 сентября 2018

Ответ находится в последней строке ОП !!

Я работал на долгоживущем кластере Session , и к тому времени, когда мой job.jar был выполнен, инициализация FileSystem уже была выполнена, и фабрики уже были загружены! и поэтому, когда я добавил свою работу, не было выполнено инициализации вызовов.

Решение? В зависимости от того, как вы выполняете свою работу, есть несколько способов:

  • Автономный: Добавьте банку, содержащую реализацию FileSystem, в каталог lib/

  • Cluster (manual): Добавьте банку, содержащую реализацию FileSystem, в каталог lib/ вашего zip или изображения или чего-либо еще.

  • Кластер (docker) (long-living): Создайте пользовательский образ контейнера и добавьте jar в каталог lib/ этого образа.

  • Кластер (docker) (per-job-session): Создайте пользовательский образ контейнера и добавьте все банки (содержащие FileSystem, вашу работу и т. Д.) В lib/ directory, подробнее о per-job сессии здесь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...