Spring + Kafka: клиенты должны использовать неблокирующий ввод-вывод для реализации конвейеризации запросов и повышения пропускной способности - PullRequest
0 голосов
/ 21 февраля 2020

Справочная информация: я давно использовал Kafka + Spring для журналов, в основном следуя этому простому подходу: создайте инициализатор проекта Spring Boot, используйте kafkatemplate для создания / отправки журналов в kafka topi c и у меня есть как минимум две установки kafka как кластер. Пока все хорошо.

Текущая задача: я должен предложить корпоративное решение для регистрации всех ошибок всех микросервисов. Все микросервисы закодированы с помощью Spring Boot. Я создал отдельную топику c для каждого приложения и соответственно создам кластер кафок.

Моя проблема в том, что я немного запутался, если я выбрал правильный подход, используя kafkatemplate с конфигурацией Spring Boot по умолчанию, и я пытаюсь глубже понять концепции.

С apache .ork wiki Я читаю:

The client will likely need to maintain a connection to multiple brokers, as data is partitioned and the clients will need to talk to the server that has their data. However it should not generally be necessary to maintain multiple connections to a single broker from a single client instance (i.e. connection pooling).

The broker's request processing allows only a single in-flight request per connection in order to guarantee this ordering. Note that clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput. i.e., clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer. All requests are initiated by the client, and result in a corresponding response message from the server except where noted.

Таким образом, он говорит: «... только один запрос в полете на соединение ... должен использовать неблокирующий ввод-вывод». Хорошо, я понимаю, что при использовании kafkatemplate и @ EnableAsyn c я по умолчанию получаю «неблокирующий» подход, поскольку он вернет Future (я думаю, мне не следует использовать kafkatemplate без включения asyn c в моем KafkaProducerConfig).

Но представьте, что в моих журналах ошибок в микросервисе произошел неожиданный «бум», поскольку некоторые клиенты этого микросервиса сильно увеличивают запрос и вызывают то же исключение в виде бесконечного l oop. Хорошо, скажем, у меня есть 10 экземпляров одного и того же микросервиса (и у меня есть этот сценарий), создающих журналы execpitons для одной и той же kafka topi c (kafka также является балансировкой нагрузки, скажем, тремя экземплярами).

Поскольку у меня был исключительный бум, я понимаю, что будут многочисленные потоки (каждый на соответствующее будущее, пока оно не будет решено), отправляющие сообщения, иными словами, каждый поток удерживается во время соединения. Мне известно, что «... обычно не требуется поддерживать несколько соединений с одним посредником из одного экземпляра клиента (т. Е. Пул соединений)». и в будущем соединение с kafka, вероятно, будет решено гораздо быстрее, чем соединение db или http. Но, как уже упоминалось, «не должно быть вообще» необходимо. Просто предположите, что это действительно БОЛЬШОЙ БУМ, и многие фьючерсы с его собственной темой запущены.

Мой главный вопрос: если предположить такую ​​возможность, работа с kafkatemplate + asyn c config + cluster of kafka - это хорошее эмпирическое правило или есть какая-то концепция или распространенный идентификатор, который я не принимаю во внимание?

Полезные комментарии могут быть очень полезны:

  • если я кодирую kafkatemplate как одиночный файл, или эта общая конфигурация достаточна для случаев регистрации:

    @EnableAsync @Configuration publi c class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
    }
    
    @Bean
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
        return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
    }
    
  • в этом обсуждении stackoverflow Я нашел кого-то, кто предложил уменьшить max.block.ms, чтобы избежать " ... когда возникает проблема с подключением (например, сервер отключен), результат становится не асинхронным, а метод остается заблокированным ". Если я уменьшу слишком много, могу ли я потерять сообщения журнала, и если я уйду слишком высоко, я рискну взломать sh микросервис, так как он будет продолжать создавать новые потоки, каждый для каждого будущего не будет решен? Если бы я не сказал что-нибудь глупое sh, обходной путь или хорошую практику в этом случае? Например, после того, как определенное число войдет в журнал локально в файле и найдет вторичное решение с filebeat, чтобы восстановить его позже?

  • что за и против принимать во внимание путем изменения ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION по моему сценарию?

...