Настройка Kafka для задержки, потери пакетов и недоступности - PullRequest
0 голосов
/ 25 апреля 2020

Я пытаюсь оптимизировать производительность Kafka в сценарии, где есть высокая задержка (> 500 мс) и периодическая потеря пакетов. Я работаю с JAVA и использую API-интерфейс kafka_2.13, версия: 2.5.0. У меня 24 узла, подключенных к одному брокеру, каждый узел пытается отправить небольшое сообщение всем остальным подписчикам. Я наблюдаю, что все узлы способны обмениваться данными, когда нет потери пакетов или задержки, но, похоже, они не могут обмениваться данными вскоре после того, как я добавлю задержку и потерю пакетов. Я проведу больше тестов в понедельник, но мне было интересно, есть ли у кого-нибудь какие-либо предложения по возможным улучшениям конфигурации.

После этого вы можете увидеть код, который я вижу, чтобы опубликовать sh и получать сообщения, а затем различные конфигурации, которые используется для потребителей и производителей.

Издатели:

boolean sendAsyncMessage (byte[] value, String topic) {
    ProducerRecord<Long, byte[]> record = new ProducerRecord<> (topic, System.currentTimeMillis (), value);
    long msStart = System.currentTimeMillis ();
    producer.send (record, (metadata, exception) -> {
        long msDelta = System.currentTimeMillis () - msStart;
        logger.info ("Message with topic {} sent at {}, was ack after {}", topic, msStart, msDelta);
        if (metadata == null) {
            logger.info ("An exception was triggered during send:" + exception.toString ());
        }
    });
    producer.flush ();
    return true;
}

Подписчики:

while (keepGoing.get ()) {
    try {
        // java example do it every time!
        subscribe ();
        ConsumerRecords<Long, byte[]> consumerRecords = consumer.poll (Duration.ofMillis (2000));
        manageMessage (consumerRecords);
        //Thread processRecords = new Thread (() -> manageMessage (consumerRecords));
        //processRecords.start ();
    } catch (Exception e) {
        logger.error ("Problem in polling: " + e.toString ());
    }
}

Производитель:

properties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaBroker.KEY_SERIALIZER);
properties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaBroker.VALUE_SERIALIZER);
properties.put (ProducerConfig.ACKS_CONFIG, reliable ? "1" : 0);
// host1:port1,host2:port2
properties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
// how many bytes to buffer records waiting to be sent to the server
//properties.put (ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put (ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");

properties.put (ProducerConfig.CLIENT_ID_CONFIG, clientID);
//15 MINUTES
properties.put (ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 54000000);
// MAX UNCOMPRESSED MESSAGE SIZE
// properties.put (ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
// properties.put (ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);
properties.put (ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 300);
properties.put (ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

Потребитель

properties.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaBroker.KEY_DESERIALIZER);
properties.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaBroker.VALUE_DESERIALIZER);
// host1:port1,host2:port2
properties.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
// should be the topic
properties.put (ConsumerConfig.GROUP_ID_CONFIG, groupID);
properties.put (ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
properties.put (ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put (ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

Ответы [ 2 ]

1 голос
/ 25 апреля 2020

Прежде чем пытаться изменить все настройки, я бы сделал несколько изменений в вашей логике c:

  • Производитель

    В настоящее время вы звоните flush() после отправки каждого сообщения эффективно выполняется синхронная отправка. Это не рекомендуется, поскольку вынуждает клиента Kafka делать запросы к кластеру для каждого отдельного сообщения. Это довольно неэффективно. В большинстве случаев лучше позволить клиенту решить, когда на самом деле отправлять сообщения, а не использовать flush().

  • Потребитель

    В каждой итерации вы вызываете subscribe(), это не нужно. Вам следует звонить subscribe() только тогда, когда вы хотите изменить подписку. Также не рекомендуется создавать новую тему в каждом poll() l oop! Помимо медленной работы, вы рискуете создать сотни или тысячи потоков, если потребитель начнет получать большое количество сообщений.

Кафка использует протокол TCP, поэтому потерянные пакеты следует автоматически повторить , По умолчанию клиенты Kafka настроены на повторение большинства операций и автоматическое повторное подключение к брокерам в случае потери соединения.

При выполнении тестов перед изменением конфигурации вы должны увидеть, как ведет себя клиент Kafka, отслеживая его показатели и журналы. Достигнуты ли таймауты из-за задержки? Сообщения повторяются?

0 голосов
/ 29 апреля 2020

В конце концов, самым большим фактором, который мешал моей распределенной системе правильно взаимодействовать, была опция acks производителя. Вначале мы установили это значение на all (самый строгий вариант), и, похоже, что в сочетании с поврежденной сетью Kafka не давал производительности, аналогичной другим протоколам на основе TCP. Теперь мы используем 0 для ненадежных сообщений и 1 для надежных.

...