Когда использовать ConcurrentKafkaListenerContainerFactory? - PullRequest
1 голос
/ 06 марта 2019

Я новичок в kafka и прошел документацию , но ничего не понял.Может кто-нибудь объяснить, когда использовать класс ConcurrentKafkaListenerContainerFactory?Я использовал класс Kafkaconsumer, но вижу, что ConcurrentKafkaListenerContainerFactory используется в моем текущем проекте.Пожалуйста, объясните, для чего он служит.

Ответы [ 2 ]

1 голос
/ 07 марта 2019

Потребитель Kafka НЕ является потокобезопасным. Весь сетевой ввод / вывод происходит в потоке приложения, выполняющего вызов. Пользователь несет ответственность за обеспечение правильной синхронизации многопоточного доступа. Несинхронизированный доступ приведет к ConcurrentModificationException.

Если потребителю назначено несколько разделов для извлечения данных, он будет пытаться использовать их все одновременно, эффективно предоставляя этим разделам одинаковый приоритет для потребления. Однако в некоторых случаях потребители могут сначала сосредоточиться на извлечении из некоторого подмножества назначенных разделов на полной скорости, и начинать выборку других разделов только тогда, когда эти разделы имеют мало данных или вообще не используют их.

Spring-Кафка

ConcurrentKafkaListenerContainerFactory используется для создания контейнеров для аннотированных методов с @KafkaListener

Есть две MessageListenerContainer весной кафки

KafkaMessageListenerContainer
ConcurrentMessageListenerContainer

KafkaMessageListenerContainer получает все сообщения от всех тем или разделов в одной теме. ConcurrentMessageListenerContainer делегирует один или несколько экземпляров KafkaMessageListenerContainer для обеспечения многопоточного потребления.

Использование ConcurrentMessageListenerContainer

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

Имеет свойство параллелизма. Например, container.setConcurrency (3) создает три KafkaMessageListenerContainer экземпляра.

Если у вас есть шесть TopicPartition экземпляров, предоставляется и параллелизм равен 3; каждый контейнер получает два раздела. Для пяти экземпляров TopicPartition два контейнера получают два раздела, а третий получает один. Если параллелизм больше, чем число TopicPartitions, параллелизм уменьшается, так что каждый контейнер получает один раздел.

вот яркий пример с документацией здесь

1 голос
/ 06 марта 2019

Kafka Consumer API не является поточно-ориентированным.API-интерфейс ConcurrentKafkaListenerContainerFactory обеспечивает одновременный способ использования API-интерфейса Kafka Consumer наряду с настройкой других свойств-потребителей Kafka.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...