Потребление кафки сообщения от Camel Router - PullRequest
0 голосов
/ 08 марта 2019

Я могу создать сообщение для Кафки с маршрутизатора Camel, но не могу получить созданное сообщение. Мой код такой, как показано ниже

Я использую camel-kafka v2.17.0 и Apache Camel v2.22.0

//to Produce the message which is successful    
from("direct:start").process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    System.out.println("Sending to Kafka");
                    exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
                    exchange.getIn().setHeader(KafkaConstants.KEY, "1");
                }
            }).to("kafka:localhost:9092?topic=eventTopic").to("stream:out");

//To consume the message which is failing i have doubts with group ID
            from("kafka:localhost:9092?topic=eventTopic&groupId=foo&autoOffsetReset=earliest&consumersCount=1")
            .process(new Processor() {

                        @Override
                        public void process(Exchange exchange) throws Exception {
                            System.out.println("Recieved from Kafka");
                            String messageKey = "";
                            if (exchange.getIn() != null) {
                                Message message = exchange.getIn();
                                Integer partitionId = (Integer) message.getHeader(KafkaConstants.PARTITION);
                                String topicName = (String) message.getHeader(KafkaConstants.TOPIC);
                                if (message.getHeader(KafkaConstants.KEY) != null)
                                    messageKey = (String) message.getHeader(KafkaConstants.KEY);
                                Object data = message.getBody();

                                System.out.println("topicName :: " + topicName + " partitionId :: " + partitionId
                                        + " messageKey :: " + messageKey + " message :: " + data + "\n");
                            }
                        }
                    }).to("log:input");
...