Каково максимальное количество асин c потоков, созданных для ответа kafkatemplate asyn c - PullRequest
1 голос
/ 24 апреля 2020

"ForkJoinPool создается с заданным целевым уровнем параллелизма; по умолчанию он равен числу доступных процессоров."

Предположим, что мой ЦП имеет 2 ядра. Итак, число максимальных потоков, созданных ForkJoinPool, равно 4?

Предположим, я выполняю асинхронную операцию, которая возвращает объект будущего в операции al oop (скажем, 10k), которая использует Forkpool по умолчанию ... тогда как много потоков будет создано Forkpool?

List<ListenableFuture<SendResult<String, String>>> cf = new ArrayList<ListenableFuture<SendResult<String, String>>>();

future = kafkaTemplate.send(topicName, message);
cf.add(future);

i++;

future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

    @Override
    public void onSuccess(SendResult<String, String> result) {
        syso("sent success");
    }

    @Override
    public void onFailure(Throwable ex) {
        System.out.println(" sending failed");
    }
});

И, в каком-то другом классе, я проверяю, закончилось ли все будущее или нет:

    for (ListenableFuture<SendResult<String, String>> m : myFutures) {
        m.get();
    }

1 Ответ

2 голосов
/ 25 апреля 2020

Нет дополнительных потоков; фьючерсы завершаются в потоке ввода-вывода производителя.

Вот тест, который показывает обратные вызовы ...

@SpringBootApplication
public class So61415751Application {


    private static final Logger LOG = LoggerFactory.getLogger(So61415751Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So61415751Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        template.setProducerListener(new ProducerListener<String, String>() {
            @Override
            public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
                LOG.info(recordMetadata.toString());
            }
        });
        return args -> {
            IntStream.range(0, 9).forEach(i -> template.send("so61415751", "foo" + i));
            LOG.info("Sent");
            Thread.sleep(10_000);
        };
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so61415751").partitions(1).replicas(1).build();
    }

}
spring.kafka.producer.properties.linger.ms=3000

#logging.level.org.springframework.kafka=debug

logging.level.org.apache.kafka=debug

результат

2020-04-24 17:27:46.282  INFO 96084 --- [           main] com.example.demo.So61415751Application   : Sent

...

3 second linger

...

2020-04-24 17:27:49.299  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@63
2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@64
2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@65
2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@66
2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@67
2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@68
2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@69
2020-04-24 17:27:49.301  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@70
2020-04-24 17:27:49.301  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@71

(Поток вызовов ProducerListener также завершает будущее).

...