Кафка увеличивает пропускную способность с несколькими разделами и несколькими потребительскими потоками - PullRequest
0 голосов
/ 16 апреля 2019

Я использую поток Кафки для некоторых приложений.

Поток потока, как показано ниже

kafkaProducer---->StreamerConsumer1->finalCosumer

У меня есть продюсер, который очень быстро записывает данные, и мой StreamConsumer сопоставляет каждый поток с каким-либо процессом и перенаправляет поток в другую тему.

в моей карте StreamCosumer я добавил свою собственную функцию отображения, которая на самом деле пытается сохранить соответствующие данные, как показано ниже

public void checkRecord(T1 key, T2 value) {
 switch(T1.toString()){
 case "key1":
  //Get relavant fileds from value and perisit in db 
   break;
   case "key2":
     //Get relavant fileds from value and perisit in db 
   break;
 }
}


KStream<String, KafkaStatusRecordWrapper> pDStream[] = myStream.map(this::checkRecord).branch((key, value)-> value.isSuccess(),(key, value)-> !value.isSuccess());

pDStream[0].mapValues(value -> transformer(value)).to("other_topic",Produced.with(stringSerde, stringSerde));   

Теперь моя функция потребителя записей checkRecord является однопоточной, и для ее возврата почти 300 мс (из-за некоторой бизнес-логики и сохранения базы данных, которых я не могу избежать).

Я не могу увеличить количество разделов, так как были некоторые ограничения от нашей инфраструктуры, а также из-за ограничений ниже

More Partitions Requires More Open File Handles
More Partitions May Increase Unavailability
More Partitions May Increase End-to-end Latency

поэтому я планирую написать многопоточный потребитель потока.

Но меня беспокоят следующие пункты.

  1. Мне нужно обработать запись только один раз
  2. Передача другому потоку вызовет проблемы с управлением смещением.

Так как увеличить пропускную способность?

У меня достаточно ресурса на потребителя, только 40% его ресурса используется.

1 Ответ

1 голос
/ 16 апреля 2019

Вы можете установить конфигурацию потока num.stream.threads, чтобы настроить количество потоков.Максимальным значением может быть максимальное количество разделов.Это помогает увеличить параллельность экземпляра приложения.

Допустим, если в вашей теме 4 раздела, вы можете установить следующее:

properties.set("num.stream.threads",4);
...