Я пытаюсь поддерживать длительный поток Akka, который разветвляется на подпотоки (скажем, для сопоставления записей / throttle / writeToDB с определенным идентификатором). Поскольку поток должен оставаться живым в течение длительного времени, в какой-то момент поток будет вне доступных подпотоков (и я все равно хотел бы очистить неиспользуемую память).
- Как можно очистить «бездействующие» подпотоки? (Do c указывает на
idleTimeout
и recoverWithRetries
, но мне кажется, что это не освобождает подпоток. Не правильно ли я его использую? Я вижу, что recoverWithRetries
вызывается в нужное время, но следующий MAX_SUBSTREAMS + 1
-й ключ, который прибывает позже, все еще терпит неудачу (Cannot open a new substream as there are too many substreams open
)) - Как обращаться с случаем, который, возможно, не имеет никакого подпотока для очистки? (могу ли я / как замедлить восходящий поток?)
- Эта запись говорит, что
groupBy удаляет входные данные для подпотоков, которые уже были закрыты
Это не то, что я хочу, мне нужно, чтобы в этом случае был заново создан подпоток. Также я не могу найти упоминания об этом поведении в do c.
. В конце концов, мне нужно развернуть поток в пул подпотоков. Если используются все подпотоки, снизьте скорость вверх по течению. Если подпоток не получает никакой новой записи в течение x секунд, испустите, очистите ее и переместите обратно в пул.
Flow.of(Record.class)
.groupBy(MAX_SUBSTREAMS, Record::getKey)
.via(conflateThenThrottleThenCommitRecord)
.idleTimeout(Duration.of(2, ChronoUnit.SECONDS))
.recoverWithRetries(1, new PFBuilder()
.matchAny(ex -> Source.empty())
.build())
.mergeSubstreams();