Почему производитель asyn c создает сообщения с linger.ms и batch.size, для которых установлено большое значение, а для autoFlu sh установлено значение false? - PullRequest
0 голосов
/ 09 июля 2020

Я использую spring-kafka 2.2.8 и пишу простой производитель asyn c со следующими настройками:

producer config key : compression.type  and value is : none
producer config key : request.timeout.ms  and value is : 10000
producer config key : acks  and value is : all
producer config key : batch.size  and value is : 33554431
producer config key : delivery.timeout.ms  and value is : 1210500
producer config key : retry.backoff.ms  and value is : 3000
producer config key : key.serializer  and value is : class org.apache.kafka.common.serialization.StringSerializer
producer config key : security.protocol  and value is : SSL
producer config key : retries  and value is : 3
producer config key : value.serializer  and value is : class io.confluent.kafka.serializers.KafkaAvroSerializer
producer config key : max.in.flight.requests.per.connection  and value is : 1
producer config key : linger.ms  and value is : 1200000
producer config key : client.id  and value is : <<my app name>>

Я распечатал указанные выше настройки производителя, используя фрагмент кода ниже:

DefaultKafkaProducerFactory defaultKafkaProducerFactory = (DefaultKafkaProducerFactory) mykafkaProducerFactory;
        Set<Entry> set  = defaultKafkaProducerFactory.getConfigurationProperties().entrySet();
        set.forEach( item ->
                System.out.println("producer config key : "+item.getKey()+"  and value is : "+item.getValue())
        );

Теперь я создаю KafkaTemplate с autoFlu sh как false, вызывая приведенный ниже конструктор

public KafkaTemplate(mykafkaProducerFactory, boolean autoFlush) 

Теперь у меня есть производитель asyn c, создающий 10 сообщений в пролет 10 сек c. Затем, к удивлению, я получил все 10 сообщений, опубликованных на topi c за несколько секунд, и я уверен, что размер этих 10 сообщений в совокупности намного меньше, чем размер моего пакета. Размер: 33554431

Теперь мой вопрос составляет

  1. Почему сообщения публикуются вместо ожидания либо linger.ms, либо batch.size перед созданием сообщения?

1 Ответ

0 голосов
/ 09 июля 2020

Похоже, вы неправильно устанавливаете эти свойства; покажите, как вы их устанавливаете. Я только что протестировал

batch.size=1000000
linger.ms=10000

и отправил 10 сообщений подряд, и потребовалось ровно 10 секунд, чтобы они прибыли к потребителю.

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.type=batch

spring.kafka.producer.properties.batch.size=1000000
spring.kafka.producer.properties.linger.ms=10000
@SpringBootApplication
public class So62820095Application {


    private static final Logger LOG = LoggerFactory.getLogger(So62820095Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So62820095Application.class, args);
    }

    @KafkaListener(id = "so62820095", topics = "so62820095")
    public void listen(List<String> in) {
        LOG.info(in.toString());
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so62820095").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> IntStream.range(0,  10).forEach(i -> template.send("so62820095", "foo" + i));
    }

}
...