Я сталкиваюсь с ошибкой ниже, когда пытаюсь вставить сообщение в Kafka через API производителя
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for metadata_notification-0: 30036 ms has passed since batch creation plus linger time
В чем может быть проблема с этим?
Более код ниже:
protected void produceMessage(String topic, String orgId, Map<String, byte[]> header, String payload) throws Exception {
System.out.println("In Produce Message Method");
final Producer<String, String> producer = createMetaProducer();
long time = System.currentTimeMillis();
try {
List<Header> messageHeaders = header.entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
System.out.println("Send header");
LOGGER.info("Sending headers:");
for(Header mh: messageHeaders) {
LOGGER.info("{}: {}",mh.key(),new String(mh.value()));
}
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, orgId, payload, messageHeaders);
RecordMetadata metadata = producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - time;
System.out.printf("sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(),
metadata.offset(), elapsedTime);
} finally {
producer.flush();
producer.close();
}
}
В этот момент, как показано ниже, происходит сбой, и выдает ошибку, указанную выше - TimeoutException
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, orgId, payload, messageHeaders);
RecordMetadata metadata = producer.send(record).get();
В МОЕМ случае кафка - это изображение докера