Flink Had oop Bucketing Sink с множеством параллельных ведер - PullRequest
1 голос
/ 07 февраля 2020

Я изучаю характеристики работы Flink, которая переносит данные из Kafka в S3 Sink. Мы используем BucketingSink для записи файлов паркета. Логика сегментирования c делит сообщения, имеющие папку, по типу данных, арендатору (клиенту), дате и времени, идентификатору извлечения и т. Д. c и т. c. В результате каждый файл сохраняется в структуре папок, состоящей из 9-10 слоев (s3_bucket: / 1/2/3/4/5/6/7/8/9 / myFile ...)

Если данные распределяются в виде пакетов сообщений для типа арендатора, мы видим хорошую производительность при записи, но когда данные в большей степени распространяются в виде белого шума по тысячам арендаторов, десяткам типов данных и множественным идентификаторам извлечения, мы получаем невероятную потерю выступления. (порядка 300 раз)

При подключении отладчика кажется, что проблема связана с количеством одновременно открытых на S3 обработчиков для записи данных. Более конкретно: profiling execution

Исследование библиотек oop, используемых для записи в S3, обнаружило несколько возможных улучшений:

      <name>fs.s3a.connection.maximum</name>
      <name>fs.s3a.threads.max</name>
      <name>fs.s3a.threads.core</name>
      <name>fs.s3a.max.total.tasks</name>

Но ничего из них большая разница в пропускной способности. Я также попытался сгладить структуру папок для записи в один ключ, например (1_2_3 _...), но также это не принесло никаких улучшений.

Примечание: тесты были выполнены на Flink 1.8 с Had oop FileSystem (BucketingSink), запись в S3 с использованием библиотек oop fs 2.6.x (поскольку мы используем Cloudera CDH 5.x для точек сохранения), поэтому мы не можем переключиться на StreamingFileSink.

1 Ответ

2 голосов
/ 07 февраля 2020

После предложения Костаса в https://lists.apache.org/thread.html/50ef4d26a1af408df8d9abb70589699cb6b26b2600ab6f4464e86ea4%40%3Cdev.flink.apache.org%3E

Виновником замедления является фрагмент кода: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L543 -L551

Это само по себе занимает около 4-5 секунд, в общей сложности 6 секунд, чтобы открыть файл. Журналы от инструментированного вызова:

2020-02-07 08:51:05,825 INFO  BucketingSink  - openNewPartFile FS verification
2020-02-07 08:51:09,906 INFO  BucketingSink  - openNewPartFile FS verification - done
2020-02-07 08:51:11,181 INFO  BucketingSink  - openNewPartFile FS - completed partPath = s3a://....

Это вместе с настройкой по умолчанию для группирования приемника с опрокидыванием бездействия 60 секунд https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L195 означает, что при наличии более 10 параллельных сегментов в слоте к тому времени, когда мы закончили sh, создавая последний бак, первый стал устаревшим, поэтому его нужно повернуть, создав ситуацию блокировки.

Мы решили это, заменив BucketingSink.java и удалив проверку FS, упомянутую выше. :

        LOG.debug("Opening new part file FS verification");
        if (!fs.exists(bucketPath)) {
            try {
                if (fs.mkdirs(bucketPath)) {
                    LOG.debug("Created new bucket directory: {}", bucketPath);
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Could not create new bucket path.", e);
            }
        }
        LOG.debug("Opening new part file FS verification - done");

, поскольку мы видим, что приемник без него работает нормально, теперь открытие файла занимает ~ 1.2se c.

Более того, мы устанавливаем порог неактивности по умолчанию равным 5 минутам. С этими изменениями мы можем легко обрабатывать более 200 блоков на слот (как только задание наберет скорость, оно будет загружаться во все слоты, поэтому откладывает неактивное время ожидания)

...