Мне нужен контейнер, который я могу запустить / остановить. Также в моей программе мне нужно назначить определенные разделы. Но я не могу понять, как достичь обоих условий.
В первом случае Я могу сделать что-то подобное (Извините, мой Kotlin):
val consumerProps = props.consumer.buildProperties()
val containerProps = ContainerProperties(topic)
containerProps.pollTimeout = 1_000
val enableAutoCommit = consumerProps[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG]?.let { it as Boolean }
if (enableAutoCommit == null || !enableAutoCommit) {
containerProps.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
}
val container = ConcurrentMessageListenerContainer(factory, containerProps)
container.setupMessageListener(listener)
container.concurrency = CONCURRENCY
Я не вижу никакой возможности добавить информацию о разделах в этот фрагмент.
Во втором случае Я могу сделать что-то подобное:
// @Autowired private val listenerContainerFactory: ConcurrentKafkaListenerContainerFactory<String, String>
val container = listenerContainerFactory.createContainer(
partitions.map { TopicPartitionInitialOffset(topic, it.partition()) }
)
И я не вижу никакой возможности изменить ackMode
в этой конфигурации.