Мне нужно написать службу отдыха с загрузкой Spring Kafka для динамического создания потребителей с указанным c идентификатором потребителя и заданным c начальным смещением. Если потребитель уже существует, я должен перемотать его. но теперь потребитель создается.
когда я отправляю http://localhost: 8080 / rewindMessage? consumerID ="system1" & newMessageOffset = 2
у меня всегда есть: информация о потребителе: (memberId = consumer-1-4951c570-c53f-4a29-aa86-9aaecebdd037, clientId = потребитель-1, хост = / 127.0.0.1, назначение = (topicPartitions = back-0)) я не могу понять, почему потребитель не создан и почему clientId не равен тому, что я написал.
public class ReceiverController {
@PutMapping(value = "/rewindMessage")
public ResponseEntity<ObjectNode> rewindMessage(@RequestParam("consumerID") String consumerID,@RequestParam("newMessageOffset") long newMessageOffset) {
ObjectNode response=this.receiver.rewind(consumerID,newMessageOffset);
ResponseEntity<ObjectNode> entity=ResponseEntity.status(HttpStatus.valueOf(response.get("httpStatus").intValue())).body(response);
return entity;
}
public class Receiver {
public ObjectNode rewind(String consumerID, long newMessageOffset) {
LOG.info("rewindMessage.consumerID=>" + consumerID);
LOG.info("rewindMessage.newMessageOffset=>" + newMessageOffset);
ObjectMapper mapper = new ObjectMapper();
ObjectNode response = mapper.createObjectNode();
if (consumerID != null) {
AdminClient client = createAdminClient();
DescribeConsumerGroupsResult result = client.describeConsumerGroups(Collections.singleton(this.groupID));
Iterator<MemberDescription> it = result.describedGroups().get(this.groupID).get().members().stream().iterator();
while (it.hasNext()) {
System.out.println("consumer information: " + it.next().toString());
}
int n = (int) result.describedGroups().get(this.groupID).get().members().stream()
.filter(v -> v.clientId().equals(consumerID)).count();
if(n==0)
{
this.config = new ReceiverConfig();
ConcurrentMessageListenerContainer<String, String> container = this.config.createContainer(consumerID, "DMS",
newMessageID);
//this.config.createConsumer(consumerID, "DMS", newMessageID);
response.put("timestamp", new Timestamp(System.currentTimeMillis()).toString());
response.put("errorCode", "201");
response.put("httpStatus", 201);
response.put("message", "consumerID " + consumerID + " rewound to consumerID " + newMessageID + ".");
}
}
return response;
}
@EnableKafka
public class ReceiverConfig {
private String bootstrapServers = "localhost:9092";
private String topic = "back";
Map<String, Object> properties;
private ConcurrentKafkaListenerContainerFactory<String, String> factory;
ReceiverConfig() {
this.factory=new ConcurrentKafkaListenerContainerFactory<String, String>();
this.factory.getContainerProperties().setIdleEventInterval(5000L);
this.factory.setConsumerFactory(consumerFactory());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
ConcurrentMessageListenerContainer<String, String> createContainer(String consumerId, String groupId, long messageId){
TopicPartitionOffset offset= new TopicPartitionOffset(topic,0,messageId,false);
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(offset);
container.getContainerProperties().setGroupId(groupId);
container.getContainerProperties().setClientId(consumerId);
container.getContainerProperties().setMessageListener(new BackChannelListener());
return container;
}
}