Распределение тематических разделов по concurrentMessageListenerContainer - PullRequest
0 голосов
/ 27 марта 2019

Я настраиваю ConcurrentMessageListenerContainer

<bean class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" id="messageListenerContainer">
        <constructor-arg index="0" ref="consumerFactory"/>
        <constructor-arg index="1" ref="containerProperties"/>
        <property name="concurrency" value="2"/>
    </bean>

ConsumerFactory использует эту конфигурацию:

    <util:map id="consumerConfig" map-class="java.util.HashMap">
        <entry key="#{T(org.apache.kafka.clients.consumer.ConsumerConfig).BOOTSTRAP_SERVERS_CONFIG}"
               value="${rp.kafka.bootstrap.servers}"/>
        <entry key="#{T(org.apache.kafka.clients.consumer.ConsumerConfig).KEY_DESERIALIZER_CLASS_CONFIG}"
               value="org.apache.kafka.common.serialization.StringDeserializer"/>
        <entry key="#{T(org.apache.kafka.clients.consumer.ConsumerConfig).VALUE_DESERIALIZER_CLASS_CONFIG}"
               value="org.springframework.kafka.support.serializer.JsonDeserializer"/>
        <entry key="#{T(org.springframework.kafka.support.serializer.JsonDeserializer).TRUSTED_PACKAGES}"
               value="*"/>
        <entry key="#{T(org.apache.kafka.clients.consumer.ConsumerConfig).PARTITION_ASSIGNMENT_STRATEGY_CONFIG}"
               value="org.apache.kafka.clients.consumer.RoundRobinAssignor"/>
        <entry key="#{T(org.apache.kafka.clients.consumer.ConsumerConfig).ENABLE_AUTO_COMMIT_CONFIG}"
               value="false"/>
    </util:map>

, а свойства контейнера -

    <bean class="org.springframework.kafka.listener.ContainerProperties" id="containerProperties">
        <constructor-arg>
            <list>
                <value>sendSMS</value>
            </list>
        </constructor-arg>
        <property name="groupId" value="main"/>
        <property name="messageListener" ref="messageListener"/>
        <property name="ackMode" value="RECORD"/>
    </bean>

Моя тема "sendSMS"имеет 5 разделов в 3-узловом кластере с коэффициентом повторения 3, поэтому я ожидаю, что каждый KafkaMessageListenerContainer, созданный одним Concurrent (всего 2 в этом случае), будет обрабатывать свою часть разделов.Однако после запуска приложения я вижу в своем окне отладчика, что каждый слушатель обрабатывает все 5!разделы https://gyazo.com/183626ff60061b471858f8cc52573353 и сообщение от 4-го раздела (там, где у меня есть сообщение, которое останавливает обработку и не фиксируется после перезапуска, но не относится к этой проблеме) с тем же смещением, доставляется 2 раза вразные темы с разными потребителями!Почему так происходит?Это ошибка или ожидаемое поведение?

1 Ответ

0 голосов
/ 27 марта 2019

Вы не показываете достаточно информации.Параллельный контейнер агрегирует назначенные разделы для дочерних элементов KafkaListenerContainer s (по одному для каждого параллелизма).

@Override
public Collection<TopicPartition> getAssignedPartitions() {
    return this.containers.stream()
            .map(KafkaMessageListenerContainer::getAssignedPartitions)
            .filter(Objects::nonNull)
            .flatMap(Collection::stream)
            .collect(Collectors.toList());
}

Необходимо отобразить журналы для повторной доставки;включите ведение журнала отладки для получения дополнительной информации.

...