Я использую поток Кафки для некоторых приложений.
Поток потока, как показано ниже
kafkaProducer---->StreamerConsumer1->finalCosumer
У меня есть продюсер, который очень быстро записывает данные, и мой StreamConsumer сопоставляет каждый поток с каким-либо процессом и перенаправляет поток в другую тему.
в моей карте StreamCosumer я добавил свою собственную функцию отображения, которая на самом деле пытается сохранить соответствующие данные, как показано ниже
public void checkRecord(T1 key, T2 value) {
switch(T1.toString()){
case "key1":
//Get relavant fileds from value and perisit in db
break;
case "key2":
//Get relavant fileds from value and perisit in db
break;
}
}
KStream<String, KafkaStatusRecordWrapper> pDStream[] = myStream.map(this::checkRecord).branch((key, value)-> value.isSuccess(),(key, value)-> !value.isSuccess());
pDStream[0].mapValues(value -> transformer(value)).to("other_topic",Produced.with(stringSerde, stringSerde));
Теперь моя функция потребителя записей checkRecord является однопоточной, и для ее возврата почти 300 мс (из-за некоторой бизнес-логики и сохранения базы данных, которых я не могу избежать).
Я не могу увеличить количество разделов, так как были некоторые ограничения от нашей инфраструктуры, а также из-за ограничений ниже
More Partitions Requires More Open File Handles
More Partitions May Increase Unavailability
More Partitions May Increase End-to-end Latency
поэтому я планирую написать многопоточный потребитель потока.
Но меня беспокоят следующие пункты.
- Мне нужно обработать запись только один раз
- Передача другому потоку вызовет проблемы с управлением смещением.
Так как увеличить пропускную способность?
У меня достаточно ресурса на потребителя, только 40% его ресурса используется.