Существует топология:
kStreamBuilder.stream(kafkaProperties.getInboundTopicName(), consumed)
.filterNot((k,v) -> Objects.isNull(v))
.transform(() -> new CustomTransformer(...))
.transform(() -> new AnotherTransformer(...))
.to(kafkaProperties.getOutTopicName(), resultProduced);
с настроенным
num.stream.threads: 50
При запуске приложение застревает с постоянно регистрирующимися сообщениями (я не уверен на 100%, что оно зависло, но через 20 минутнет изменений в состоянии и процессоре, использование сети очень высоко):
State transition from RUNNING to PARTITIONS_REVOKED
AbstractCoordinator : [Consumer clientId=consumer_id-StreamThread-1-consumer, groupId=group_id] (Re-)joining group
AbstractCoordinator : [Consumer clientId=consumer_id-StreamThread-2-consumer, groupId=group_id] (Re-)joining group
AbstractCoordinator : [Consumer clientId=consumer_id-StreamThread-3-consumer, groupId=group_id] (Re-)joining group
AbstractCoordinator : [Consumer clientId=consumer_id-StreamThread-4-consumer, groupId=group_id] (Re-)joining group
AbstractCoordinator : [Consumer clientId=consumer_id-StreamThread-5-consumer, groupId=group_id] (Re-)joining group
и т. д.
В теме 100 разделов.
Что мы заметили: каждыйТрансформатор использует свой собственный persistentStateStore.После замены в inMemoryStateStore все еще были записаны журналы, но через ~ 3 минуты топология успешно запустилась.
Потоки Kafka версии 2.1.0.Брокерская версия 1.1.0