Я только начал работать с Kafka 2.1.0 сегодня и изучаю API производителя.У меня есть следующий код:
public class KafkaCheck {
public static void main(String[] args) throws Exception {
final Map<String, Object> cfg = new HashMap<>();
cfg.put("bootstrap.servers", "localhost:9092");
cfg.put("acks", "all");
cfg.put("enable.idempotence", true);
cfg.put("delivery.timeout.ms", 30000);
final KafkaProducer<String, String> client = new KafkaProducer<>(
cfg, new StringSerializer(), new StringSerializer());
try {
System.out.println("making call");
client.partitionsFor("veryimportanttopicindeed");
} catch (KafkaException e) {
System.out.println("closing");
client.close(0, TimeUnit.MILLISECONDS);
throw new RuntimeException("Well this is FUBAR");
}
}
}
Когда Кафка не работает, partitionsFor вызывает блокировку в течение 60 секунд, как и ожидалось, но затем close вызывает блоки, по-видимому, навсегда:
making call
2018-12-13 15:27:27,955 WARN o.a.k.c.NetworkClient [Producer clientId=producer-1]
Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker
may not be available.
***snip***
closing
2018-12-13 15:28:28,683 WARN o.a.k.c.NetworkClient [Producer clientId=producer-1]
Connection to node -1 (localhost/127.0.0.1:9092) could not be established.
Broker may not be available.
***snip***
2018-12-13 15:32:52,667 WARN o.a.k.c.NetworkClient [Producer clientId=producer-1]
Connection to node -1 (localhost/127.0.0.1:9092) could not be established.
Broker may not be available.
Я не вижу ничего в настройках производителя , которое могло бы повлиять на это.Как я могу заставить клиента закрыться?