Flink: обратное давление (источник: kafka, раковина :asticsearch) - PullRequest
0 голосов
/ 27 января 2019

У меня есть задание по быстрому чтению, которое читает данные из Kafka, выполняет определенные агрегации и записывает результаты в индексы эластичного поиска. Я вижу высокое противодавление на источнике. Высокое противодавление приводит к медленному чтению данных из Kafka, я вижу, что данные помещаются в очередь в сетевом стеке (netstat RecvQ показывает десятки тысяч байтов данных, застрявших в исходных соединениях kafka, данные в конечном итоге читаются), что в свою очередь вызывает данные, которые будут погружены в эластичный поиск после задержки, и эта задержка продолжает увеличиваться.

Источник производит ~ 17 500 записей в минуту, задание Flink назначает метки времени (события) для каждой входящей записи, выполняет 12 различных типов keyBy, помещает события в одно-минутное падающее окно, выполняет операции агрегации в этом потоке окон с ключами. и, наконец, записывает результаты в 12 различных индексов эластичного поиска (каждая запись является вставкой).

Проблема в том, что данные, записываемые вasticsearch, начинают отставать, поэтому результаты панели мониторинга (построенные поверх эластичного поиска) больше не отображаются в реальном времени. Я понимаю, что это происходит из-за увеличения противодавления. Не уверен, как решить эту проблему. Сам кластер представляет собой отдельный кластер на основе виртуальной машины с одним узлом, с 64 ГБ ОЗУ (диспетчер задач настроен на использование 20 ГБ) и 16 виртуальными ЦП. Нет никаких свидетельств (как видно из htop) о том, что процессор или память ограничены. Имеется только один диспетчер задач, и это единственное задание на мгновение в этом кластере.

Я не уверен, связана ли эта проблема с проблемой локальных ресурсов в кластере или из-за медленной записи вasticsearch. Я установил setBulkFlushMaxActions на 1 (как это делается во всех примерах кода, которые я видел где-либо), нужно ли мне также устанавливать setBulkFlushInterval и / или setBulkFlushMaxSizeinMB?

Я прошел https://www.da -platform.com / flink-forward-berlin / resources / улучшая пропускную способность и задержку с flinks-network-stack , но еще не пробовал настраивать параметры, перечисленные на слайде 19, не уверены, какие значения установить для этих параметров.

Наконец, я не думаю, что вижу ту же проблему при запуске одного и того же задания из IntelliJ IDE.

Я собираюсь исключить все агрегации, а затем добавить их один за другим, чтобы посмотреть, существует ли конкретная агрегация, вызывающая эту проблему?

Любые конкретные указатели будут высоко оценены, также попробуйте setBulkFlushInterval и setBulkFlushMaxSizeinMB.

Обновление 1, 01/29/2019 Похоже, что оба узла работают с очень высокой кучей, поэтому GC постоянно работает, пытаясь освободить место в JVM. Будет увеличение физической памяти с 16 до 32 ГБ, а затем перезагрузить узлы. Это должно, надеюсь, решить проблему, узнаем через 24 часа.

Ответы [ 2 ]

0 голосов
/ 03 февраля 2019

Проблема была решена путем увеличения (удвоения) оперативной памяти на узлах кластера elasticearch и установки интервала обновления индекса (для всех индексов эластичного поиска) до 30 с (по умолчанию 1 с).После внесения этих изменений противодавление во флинке сообщается как нормальное, задержки данных нет, и все выглядит превосходно.

0 голосов
/ 27 января 2019

Обычно в подобных случаях проблема заключается в подключении к внешнему хранилищу данных - либо недостаточная пропускная способность, либо синхронная запись для каждой записи, а не пакетная запись.

Один простой способ проверить, что проблема в приемнике эластичного поиска (а не, скажем, в конфигурации сетевого стека), состоит в том, чтобы заменить его отказавшим приемником (который просто ничего не делает), чтобы посмотреть, решит ли это проблему , Что-то вроде

public static class NullSink<OUT> implements SinkFunction<OUT> {
    @Override
    public void invoke(OUT value, Context context) throws Exception {
    }
}

Обновление:

Проблема в том, что вы установили параметру bulk.flush.max.actions в 1, предотвращая любую буферизацию в соединении с серверомasticsearch.

...