Как исправить 'Не удалось обновить метаданные после xxx ms' в spring-kafka при отправке записи в обратном вызове - PullRequest
0 голосов
/ 02 февраля 2019

spring-kafka не может отправить запись в обратном вызове

    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onFailure(Throwable ex) {
            log.error("log error...");
        }

        @Override
        public void onSuccess(SendResult<String, String> result) {
            kafkaTemplate.send("anotherTopic", "key", "data");
        }
    });

Kafka throw не удалось обновить метаданные, когда я вызываю kafkaTemplate.send () в onSuccess (), что не ожидается

1 Ответ

0 голосов
/ 02 февраля 2019

Похоже, что вы не можете выполнять операции производителя в потоке обратного вызова - kafka-producer-network-thread - возможно, какая-то тупиковая ситуация в коде производителя - в ожидании получения метаданных, которые будут использовать тот же поток, поэтому время ожидания истекло.

Вам, вероятно, понадобится второй KafkaTemaplate (и фабрика производителя, потому что фабрика по умолчанию всегда возвращает одного и того же производителя).

Или просто выполните вторую передачу в другом потоке ...

@SpringBootApplication
public class So54492871Application {

    private static final ExecutorService exec = Executors.newSingleThreadExecutor();

    public static void main(String[] args) {
        SpringApplication.run(So54492871Application.class, args);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("so54492871-1", 1, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("so54492871-2", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            ListenableFuture<SendResult<String, String>> future = template.send("so54492871-1", "foo");
            future.addCallback(result -> {
                System.out.println(Thread.currentThread().getName() + ":" + result);
                exec.execute(() -> {
                    ListenableFuture<SendResult<String, String>> future2 = template.send("so54492871-2", "bar");
                    future2.addCallback(result2 -> {
                        System.out.println(Thread.currentThread().getName() + ":" + result2);
                    }, ex -> {
                        System.out.println(ex.getMessage());
                    });
                });
            }, ex -> {
                System.out.println(ex.getMessage());
            });
            System.in.read();
        };
    }

}
...