Как контролировать / приостанавливать поток потоков akka в sub source / stream - PullRequest
0 голосов
/ 17 декабря 2018

Мы используем потоки akka с alpakka kafka для получения сообщений из разных тем, используя подписку на тему.Подисточники / потоки создаются для каждой темы с помощью groupBy ().
Как мы приостанавливаем / возобновляем / контролируем обработку сообщений для одной темы, не влияя на обработку сообщений для других тем.

Ниже параметрыне работает.
Использование RestartSource перезапустит источник и повлияет на обработку всех тем.
Использование RestartFlow удалит сбойные элементы, а также повлияет на обработку всех тем.
Создание источника для каждой темы - это ресурсинтенсивно, если мы должны перезапустить источники для всех тем и имеет дополнительные накладные расходы.

Consumer
    .committableSource(consumerSettings.withGroupId("test-group"),
            Subscriptions.topicPattern("/some/pattern"))
    .groupBy(5000, msg -> msg.record().topic())
    .mapAsync(1, msg -> business(msg).thenApply(response -> {
        if (response.status().isFailure()) {
            throw new RuntimeException("Boom");
        }
        return msg.committableOffset();}))
    .mapAsync(1, offset -> offset.commitJavadsl());
...