Как объединить субпотоки Акка - PullRequest
1 голос
/ 25 марта 2020

Я пытаюсь поддерживать длительный поток Akka, который разветвляется на подпотоки (скажем, для сопоставления записей / throttle / writeToDB с определенным идентификатором). Поскольку поток должен оставаться живым в течение длительного времени, в какой-то момент поток будет вне доступных подпотоков (и я все равно хотел бы очистить неиспользуемую память).

  1. Как можно очистить «бездействующие» подпотоки? (Do c указывает на idleTimeout и recoverWithRetries, но мне кажется, что это не освобождает подпоток. Не правильно ли я его использую? Я вижу, что recoverWithRetries вызывается в нужное время, но следующий MAX_SUBSTREAMS + 1 -й ключ, который прибывает позже, все еще терпит неудачу (Cannot open a new substream as there are too many substreams open))
  2. Как обращаться с случаем, который, возможно, не имеет никакого подпотока для очистки? (могу ли я / как замедлить восходящий поток?)
  3. Эта запись говорит, что

    groupBy удаляет входные данные для подпотоков, которые уже были закрыты

Это не то, что я хочу, мне нужно, чтобы в этом случае был заново создан подпоток. Также я не могу найти упоминания об этом поведении в do c.

. В конце концов, мне нужно развернуть поток в пул подпотоков. Если используются все подпотоки, снизьте скорость вверх по течению. Если подпоток не получает никакой новой записи в течение x секунд, испустите, очистите ее и переместите обратно в пул.

Flow.of(Record.class)
    .groupBy(MAX_SUBSTREAMS, Record::getKey)
    .via(conflateThenThrottleThenCommitRecord)
    .idleTimeout(Duration.of(2, ChronoUnit.SECONDS))
    .recoverWithRetries(1, new PFBuilder()
        .matchAny(ex -> Source.empty())
        .build())
    .mergeSubstreams();
...