Мы используем потоки akka с alpakka kafka для получения сообщений из разных тем, используя подписку на тему.Подисточники / потоки создаются для каждой темы с помощью groupBy ().
Как мы приостанавливаем / возобновляем / контролируем обработку сообщений для одной темы, не влияя на обработку сообщений для других тем.
Ниже параметрыне работает.
Использование RestartSource перезапустит источник и повлияет на обработку всех тем.
Использование RestartFlow удалит сбойные элементы, а также повлияет на обработку всех тем.
Создание источника для каждой темы - это ресурсинтенсивно, если мы должны перезапустить источники для всех тем и имеет дополнительные накладные расходы.
Consumer
.committableSource(consumerSettings.withGroupId("test-group"),
Subscriptions.topicPattern("/some/pattern"))
.groupBy(5000, msg -> msg.record().topic())
.mapAsync(1, msg -> business(msg).thenApply(response -> {
if (response.status().isFailure()) {
throw new RuntimeException("Boom");
}
return msg.committableOffset();}))
.mapAsync(1, offset -> offset.commitJavadsl());