Фон
У нас есть 2 потока, назовем их A
и B
.Они производят элементы a
и b
соответственно.
Поток A
создает элементы с низкой скоростью (один раз в минуту).
Поток B
получает один элемент один раз в каждый2 недели.Он использует функцию flatMap
, которая получает этот элемент и генерирует ~ 2 миллиона b
элементов в цикле:
(Java)
for (BElement value : valuesList) {
out.collect(updatedTileMapVersion);
}
valueList
здесь содержит ~ 2миллион b
элементов
Мы соединяем эти потоки (A
и B
), используя connect
, набираем ключом и выполняем еще один flatMap
для подключенного потока:
streamA.connect(streamB).keyBy(AClass::someKey, BClass::someKey).flatMap(processConnectedStreams)
Каждый из b
элементов имеет свой ключ, то есть ~ 2 миллиона ключей поступают из потока B
.
Проблема
То, что мы видим, - это голодание.Хотя есть a
элементов, готовых к обработке, они не обрабатываются в processConnectedStreams
.
Наши попытки решить проблему
Мы попытались уменьшить поток B
до 10элементов в 1 секунду, выполнив Thread.sleep()
каждые 10 элементов:
long totalSent = 0;
for (BElement value : valuesList) {
totalSent++;
out.collect(updatedTileMapVersion);
if (totalSent % 10 == 0) {
Thread.sleep(1000)
}
}
processConnectedStreams
имитируется, чтобы занять 1 секунду с другим Thread.sleep()
, и мы попробовали это с: * Установка параллелизма10 для всех конвейеров - не работает * Установка параллелизма 15 для всех конвейеров - действительно работает
Вопрос
Мы не хотим использовать все эти ресурсы, так как stream B
активируется очень редко, а для потока A
элементы, имеющие высокий параллелизм, являются излишним.Можно ли решить эту проблему, не устанавливая параллелизм больше, чем количество b
элементов, которые мы посылаем каждую секунду?