Невозможно выполнить HTTP-запрос: время ожидания ожидания соединения из пула во Flink - PullRequest
0 голосов
/ 21 июня 2019

Я работаю над приложением , которое загружает некоторые файлы в корзину s3 , а позже читает файлы из корзины s3 и отправляет их в мою базу данных .

Я использую Flink 1.4.2 и fs.s3a API для чтения и записи файлов из корзины s3.

Загрузка файлов в корзину s3 работает нормально без каких-либо проблем, но когда начинается вторая фаза моего приложения, которое читает эти загруженные файлы из s3, мое приложение выдает следующую ошибку :

Caused by: java.io.InterruptedIOException: Reopen at position 0 on s3a://myfilepath/a/b/d/4: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:125)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:155)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:281)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:364)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:702)
at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:490)
at org.apache.flink.api.common.io.GenericCsvInputFormat.open(GenericCsvInputFormat.java:301)
at org.apache.flink.api.java.io.CsvInputFormat.open(CsvInputFormat.java:53)
at org.apache.flink.api.java.io.PojoCsvInputFormat.open(PojoCsvInputFormat.java:160)
at org.apache.flink.api.java.io.PojoCsvInputFormat.open(PojoCsvInputFormat.java:37)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

Я был способен контролировать эту ошибку, увеличивая параметр max connection для s3a API.

На данный момент у меня есть около 1000 файлов в корзине s3 , что составляет нажал и потянул мое приложение в корзине s3 и мое максимальное соединение 3000 .Я использую параллелизм Флинка для загрузки / скачивания этих файлов из корзины s3.Мой диспетчер задач насчитывает 14 .Это прерывистый сбой , у меня также есть случаи успеха для этого сценария.

Мой запрос:

  1. Почему я получаю прерывистый сбой?Если максимальное установленное мною соединение было низким, то мое приложение должно выдавать эту ошибку при каждом запуске.
  2. Есть ли способ рассчитать оптимальное число макс. Соединений, необходимое для того, чтобы приложение работало без обращения к соединению?ошибка тайм-аута пула?Или эта ошибка связана с чем-то еще, о чем я не знаю?

Заранее спасибо

1 Ответ

0 голосов
/ 21 июня 2019

Некоторые комментарии, основанные на моем опыте обработки большого количества файлов из S3 с помощью рабочих процессов Flink (batch):

  1. Когда вы читаете файлы, Flink будет вычислять «разбиения» на основе числафайлов и размер каждого файла.Каждое разделение читается отдельно, поэтому теоретическое максимальное количество одновременных подключений зависит не от количества файлов, а от комбинации файлов и размеров файлов.
  2. Пул соединений, используемый клиентом HTTP, освобождает соединения посленекоторое время, поскольку возможность повторного использования существующего соединения является выигрышем (рукопожатие сервера / клиента не должно происходить).Таким образом, вводится некоторая степень случайности в число доступных соединений в пуле.
  3. Размер пула соединений не сильно влияет на память, поэтому я обычно устанавливаю его довольно высоким (например, 4096 для недавнего рабочего процесса).).
  4. При использовании кода подключения AWS значение bump равно fs.s3.maxConnections, что не совпадает с чистой конфигурацией Hadoop.
...