Как назначить пружинный контейнер kafka отдельным разделам и иметь возможность остановить / запустить его? - PullRequest
1 голос
/ 21 января 2020

Мне нужен контейнер, который я могу запустить / остановить. Также в моей программе мне нужно назначить определенные разделы. Но я не могу понять, как достичь обоих условий.

В первом случае Я могу сделать что-то подобное (Извините, мой Kotlin):

val consumerProps = props.consumer.buildProperties()

val containerProps = ContainerProperties(topic)
containerProps.pollTimeout = 1_000

val enableAutoCommit = consumerProps[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG]?.let { it as Boolean }
if (enableAutoCommit == null || !enableAutoCommit) {
    containerProps.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
}

val container = ConcurrentMessageListenerContainer(factory, containerProps)
container.setupMessageListener(listener)
container.concurrency = CONCURRENCY

Я не вижу никакой возможности добавить информацию о разделах в этот фрагмент.

Во втором случае Я могу сделать что-то подобное:

    // @Autowired private val listenerContainerFactory: ConcurrentKafkaListenerContainerFactory<String, String>
    val container =  listenerContainerFactory.createContainer(
            partitions.map { TopicPartitionInitialOffset(topic, it.partition()) }
    )

И я не вижу никакой возможности изменить ackMode в этой конфигурации.

1 Ответ

0 голосов
/ 21 января 2020

Нет никакой разницы в конфигурации контейнера независимо от конфигурации topi c (темы, шаблон, тема / разделы).

Вы можете использовать

val container = ConcurrentMessageListenerContainer(factory, containerProps)

с containerProps, созданный с помощью TopicPartitionInitialOffset (вам не нужно использовать фабрику).

Если вам по какой-то причине нужно использовать фабрику, вы можете использовать

container.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);

после контейнер создан.

...