Spring для Kafka 2.3 устанавливает смещение во время выполнения, если потребитель существует, иначе создайте нового потребителя - PullRequest
0 голосов
/ 05 марта 2020

Мне нужно написать службу отдыха с загрузкой Spring Kafka для динамического создания потребителей с указанным c идентификатором потребителя и заданным c начальным смещением. Если потребитель уже существует, я должен перемотать его. но теперь потребитель создается.

когда я отправляю http://localhost: 8080 / rewindMessage? consumerID ="system1" & newMessageOffset = 2

у меня всегда есть: информация о потребителе: (memberId = consumer-1-4951c570-c53f-4a29-aa86-9aaecebdd037, clientId = потребитель-1, хост = / 127.0.0.1, назначение = (topicPartitions = back-0)) я не могу понять, почему потребитель не создан и почему clientId не равен тому, что я написал.

    public class ReceiverController {
        @PutMapping(value = "/rewindMessage")
                public ResponseEntity<ObjectNode> rewindMessage(@RequestParam("consumerID") String consumerID,@RequestParam("newMessageOffset") long newMessageOffset) {
                    ObjectNode response=this.receiver.rewind(consumerID,newMessageOffset);
                    ResponseEntity<ObjectNode> entity=ResponseEntity.status(HttpStatus.valueOf(response.get("httpStatus").intValue())).body(response);

                    return entity;
                }

    public class Receiver {
    public ObjectNode rewind(String consumerID, long newMessageOffset) {
            LOG.info("rewindMessage.consumerID=>" + consumerID);
            LOG.info("rewindMessage.newMessageOffset=>" + newMessageOffset);
            ObjectMapper mapper = new ObjectMapper();

            ObjectNode response = mapper.createObjectNode();

            if (consumerID != null) {


            AdminClient client = createAdminClient();

            DescribeConsumerGroupsResult result = client.describeConsumerGroups(Collections.singleton(this.groupID));

                Iterator<MemberDescription> it = result.describedGroups().get(this.groupID).get().members().stream().iterator();
                while (it.hasNext()) {
                    System.out.println("consumer information: " + it.next().toString());
                }

                int n = (int) result.describedGroups().get(this.groupID).get().members().stream()
                        .filter(v -> v.clientId().equals(consumerID)).count();
                if(n==0)
             {

                            this.config = new ReceiverConfig();
                            ConcurrentMessageListenerContainer<String, String> container = this.config.createContainer(consumerID, "DMS",
                                    newMessageID);
                            //this.config.createConsumer(consumerID, "DMS", newMessageID);
                            response.put("timestamp", new Timestamp(System.currentTimeMillis()).toString());
                            response.put("errorCode", "201");
                            response.put("httpStatus", 201);
                            response.put("message", "consumerID " + consumerID + " rewound to consumerID " + newMessageID + ".");

                        }
                                        } 

            return response;

        }


        @EnableKafka
    public class ReceiverConfig {


        private String bootstrapServers = "localhost:9092";

        private String topic = "back";
        Map<String, Object> properties;


        private  ConcurrentKafkaListenerContainerFactory<String, String> factory;

        ReceiverConfig() {
            this.factory=new ConcurrentKafkaListenerContainerFactory<String, String>();
            this.factory.getContainerProperties().setIdleEventInterval(5000L);
            this.factory.setConsumerFactory(consumerFactory());


        }

        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            return props;
        }

        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }


        ConcurrentMessageListenerContainer<String, String> createContainer(String consumerId, String groupId, long messageId){


        TopicPartitionOffset offset= new TopicPartitionOffset(topic,0,messageId,false);

        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(offset);
        container.getContainerProperties().setGroupId(groupId);
        container.getContainerProperties().setClientId(consumerId);
        container.getContainerProperties().setMessageListener(new BackChannelListener());
        return container;
        }
}

1 Ответ

0 голосов
/ 05 марта 2020

consumerConfigs() никогда не возвращает карту, содержащую client.id или group.id

Если вы хотите найти / зафиксировать потребителя, вам также необходимо получить экземпляр KafkaConsumer

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...