Голодание одного из 2-х потоков в ConnectedStreams - PullRequest
0 голосов
/ 07 марта 2019

Фон

У нас есть 2 потока, назовем их A и B.Они производят элементы a и b соответственно.

Поток A создает элементы с низкой скоростью (один раз в минуту).

Поток B получает один элемент один раз в каждый2 недели.Он использует функцию flatMap, которая получает этот элемент и генерирует ~ 2 миллиона b элементов в цикле:

(Java)

for (BElement value : valuesList) {
    out.collect(updatedTileMapVersion);
}

valueList здесь содержит ~ 2миллион b элементов

Мы соединяем эти потоки (A и B), используя connect, набираем ключом и выполняем еще один flatMap для подключенного потока:

streamA.connect(streamB).keyBy(AClass::someKey, BClass::someKey).flatMap(processConnectedStreams)

Каждый из b элементов имеет свой ключ, то есть ~ 2 миллиона ключей поступают из потока B.

Проблема

То, что мы видим, - это голодание.Хотя есть a элементов, готовых к обработке, они не обрабатываются в processConnectedStreams.

Наши попытки решить проблему

Мы попытались уменьшить поток B до 10элементов в 1 секунду, выполнив Thread.sleep() каждые 10 элементов:

long totalSent = 0;
for (BElement value : valuesList) {
    totalSent++;
    out.collect(updatedTileMapVersion);
    if (totalSent % 10 == 0) {
        Thread.sleep(1000)
    }
}

processConnectedStreams имитируется, чтобы занять 1 секунду с другим Thread.sleep(), и мы попробовали это с: * Установка параллелизма10 для всех конвейеров - не работает * Установка параллелизма 15 для всех конвейеров - действительно работает

Вопрос

Мы не хотим использовать все эти ресурсы, так как stream B активируется очень редко, а для потока A элементы, имеющие высокий параллелизм, являются излишним.Можно ли решить эту проблему, не устанавливая параллелизм больше, чем количество b элементов, которые мы посылаем каждую секунду?

1 Ответ

0 голосов
/ 07 марта 2019

Было бы полезно, если бы вы указали полную топологию рабочего процесса.Например, вы не упомянули о каком-либо вводе ключа или случайном разделении данных.Если это действительно так, то Flink собирается направить несколько операций в одной задаче, что может (в зависимости от топологии) привести к проблеме, с которой вы сталкиваетесь.

Если это так, то принудительное разбиение доprocessConnectedStreams может помочь, так как тогда эта операция будет читать из сетевых буферов.

...