Отделение Кафки от Приложения - PullRequest
0 голосов
/ 19 сентября 2018

У меня есть приложение, которое получает большое количество запросов GET (около 250000 за 5 минут).Приложение анализирует параметры запроса и публикует их в Kafka.Код для публикации выглядит следующим образом:

public class KafkaProcessor {

  private static final String BATCH_SIZE = "batch.size";
  private static final String REQUEST_REQUIRED_ACKS = "request.required.acks";
  private static final String PRODUCER_TYPE = "producer.type";
  private static final String VALUE_SERIALIZER = "value.serializer";
  private static final String KEY_SERIALIZER = "key.serializer";
  private static final String METADATA_BROKER_LIST = "bootstrap.servers";
  private static final String MAX_BLOCK_MS = "max.block.ms";
  private static final String KAFKA_ENABLED = "enabled";

  private static Properties props = new Properties();
  private static KafkaProducer<String, String> producer;
  private static ProducerRecord<String, String> producerRecord;
  private static String topic;


  static {
    boolean isEnabled = Boolean.parseBoolean(ResourceProps.INSTANCE.getKafkaProps(KAFKA_ENABLED));
    if (isEnabled) {
      //Setting up a producer configuration.
      props.put(METADATA_BROKER_LIST, "x.x.x.x:9092,y.y.y.y:9092");
      props.put(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
      props.put(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
      props.put(PRODUCER_TYPE, "async");
      props.put(REQUEST_REQUIRED_ACKS, "1");
      props.put(BATCH_SIZE, "1000");
      props.put(MAX_BLOCK_MS, "10000");
      producer = new KafkaProducer<>(props);
      topic = "pixel-server";
    }
  }


  private static void publishToKafka(JSONObject data) {
      producerRecord = new ProducerRecord<String, String>(topic, data.toString());
      producer.send(producerRecord, new Callback() {
        @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
          if (exception != null) {
            exception.printStackTrace();
          }
        }
      });
  }
}

Мое приложение размещено в экземпляре AWS.Сервер Kafka также находится на другом компьютере AWS.

Однако, если kafka не работает или kafka медленно реагирует по какой-либо причине, мое приложение зависает и не может обрабатывать какой-либо запрос дальше.Я хочу знать, как я могу сделать свое приложение независимым от Кафки, то есть, если кафка выходит из строя (или медленно реагирует), это не должно влиять на мое приложение.

Я пробовал несколько способов, например, еслиkafka выдает тайм-аут, затем подсчитывает число исключений тайм-аута и прекращает публикацию в kafka, но, поскольку объем запроса очень велик, поэтому к тому времени я получаю любое исключение тайм-аута, мое приложение зависает.

Любая помощь или указательбудет оценено.

Я использую Kafka 0.8.2.Мой сервер в Vertx.ОС используется в Ubuntu.Улимит установлены на макс.

1 Ответ

0 голосов
/ 20 сентября 2018

Предполагая, что у вас есть три или более узлов в кластере Kafka (что жизненно важно для любого высоконагруженного приложения), вы можете попробовать некоторые приемы:

  1. Попробуйте установить acks Конфигурация производителя на 0.Это повлияет на согласованность вашего приложения (некоторые сообщения могут быть отброшены на стороне производителя и будут потеряны навсегда).Документация гласит:

    Если установлено в ноль, то производитель вообще не будет ждать никакого подтверждения от сервера.Запись будет немедленно добавлена ​​в буфер сокета и будет считаться отправленной.Невозможно гарантировать, что сервер получил запись в этом случае

  2. Установите max.block.ms производитель config на 0.Это заставит ваше приложение немедленно генерировать TimeoutException при каждой отправке в кластер Kafka без какой-либо блокировки, , но только при переполнении буфера памяти . Обратите внимание, что это влияет только на блокировку на стороне клиента, а не на сетевые вызовы!

  3. Уменьшите request.timeout.ms до небольших значений (например, 10 или 100).Это приведет к тому, что клиент Kafka сгенерирует исключение TimeoutException для любой сетевой операции, которая занимает больше времени, чем request.timeout.ms.

И еще несколько советов:

  1. обновите ваши экземпляры Kafka до последних версий, чтобы получить более стабильный кластер;

  2. * Для обеспечения высокой доступности 1040 * ваш кластер Kafka должен содержать как минимум три узла (и всегда нечетное количество узлов, чтобы избежать условия разделения мозга )
  3. вам следует попробовать поиграть с max.batch.size и linger.ms настройками производителя, чтобы достичь оптимального коэффициента пропускной способности для вашего приложения

...