Ваш вопрос довольно общий, поскольку не очень понятно, какую проблему вы пытаетесь решить, поэтому сложно понять, есть ли лучший способ реализовать решение.
В любом случае давайте начнем с того, что насколько я понимаю, вы ищете Selective Consumer (EIP), который не поддерживается Kafka и Consumer API. Селективный потребитель может выбрать, какое сообщение выбрать из очереди или topi c, основываясь на указанных значениях селекторов c, которые заранее выставляются производителем. Эта функция также должна быть реализована в брокере сообщений, но у kafka такой возможности нет.
Kafka реализует гибридное решение между чистым pub / sub и очередью. При этом вы можете подписаться на topi c с одной или несколькими группами потребителей (подробнее об этом позже) и отфильтровывать все сообщения, которые вам не интересны, проверяя сами сообщения. В мире обмена сообщениями и EIP этот шаблон известен как Array of Filters. Как вы можете себе представить, это произойдет после того, как сообщение будет передано всем подписчикам; поэтому, если это решение не соответствует вашим требованиям или контексту, то вы можете подумать о реализации основанного на контенте маршрутизатора, который предназначен для отправки сообщения подмножеству потребителей только под вашим централизованным управлением (это подразумевало бы промежуточное указание потребителя c каналы, которые могут быть другими темами Kafka или Seda / VM-очередями, конечно).
Переходя ко второму вопросу, вот официальный веб-сайт Kafka Component: https://camel.apache.org/components/latest/kafka-component.html. Чтобы создать разные группы потребителей, вам просто нужно определить несколько маршрутов, каждый из которых имеет выделенный идентификатор группы. Добавляя свойство groupdId, вы будете информировать координаторов Consumer Group (которые находятся у брокеров Kafka) о существовании нескольких отдельных групп потребителей, и брокеры будут использовать их для того, чтобы различать их и рассматривать отдельно (отправив им копию каждого из них). сообщение журнала, хранящееся в topi c) ...
Вот пример:
public void configure() throws Exception {
from("kafka:myTopic?brokers={{kafkaBootstrapServers}}" +
"&groupId=myFirstConsumerGroup"
.log("Message received by myFirstConsumerGroup : ${body}");
from("kafka:myTopic?brokers={{kafkaBootstrapServers}}" +
"&groupId=mySecondConsumerGroup"
.log("Message received by mySecondConsumerGroup : ${body}");
}
Как вы можете видеть, я создал два маршрута в одном RouteBuilder, если не сказать в том же Java процессе. Это очень плохое проектное решение в большинстве случаев, о которых я могу подумать, потому что нет единой ответственности, отдельных проблем, и они не будут масштабироваться. Но опять же, это зависит от ваших требований / контекста. Для полноты, пожалуйста, рассмотрите все другие свойства компонентов Kafka, так как может быть много других конфигураций, которые вас интересуют, таких как количество потоков потребителей в группе. Я пытался оставаться на высоком уровне, чтобы начать обсуждение ... Я отредактирую свой ответ в случае новых обновлений от вас. Надеюсь, я помог!