Мой слушатель kafka должен обрабатывать сообщения в последовательном порядке, метод onMessage должен обрабатывать сообщения синхронно, я не хочу, чтобы мой слушатель обрабатывал несколько сообщений одновременно, метод onmessage сначала останавливается
org.springframework.kafka.listener.MessageListenerContainer
затем передает полезную нагрузку синхронизированному методу, после завершения обработки запускает прослушиватель обратно. Другими вариантами выбора являются использование очереди блокировки, службы исполнителя и т. Д., Нужен совет по лучшей стратегии для достижения этой цели. Есть ли у потребителя kafka какие-либо функции, предназначенные для последовательной обработки сообщений?
вот мой код.
Я изменил реализацию на этот
public static class KafkaReadMsgTask implements Runnable{
@Override
public void run() {
KakfaMsgConumerImpl kakfaMsgConumerImpl=null;;
try{
kakfaMsgConumerImpl=SpContext.getBean(KakfaMsgConumerImpl.class);
kakfaMsgConumerImpl.pollFormDef();
kakfaMsgConumerImpl.pollFormData();
} catch (Exception e){
logger.error(" kafka listener errors "+e);
kakfaMsgConumerImpl.pauseTask();
}
}
}
@Component
public static class KakfaMsgConumerImpl {
@Autowired
ObjectMapper mapper;
@Autowired
FormSink formSink;
@Autowired
Environment env;
@Resource(name="formDefConsumer")
Consumer formDefConsumer;
@Resource(name="formDataConsumer")
Consumer formDataConsumer;
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
public void startPolling() throws Exception{
executor.scheduleAtFixedRate(new KafkaReadMsgTask(),10, 3,TimeUnit.SECONDS);
}
public void pauseTask(){
try{
Thread.sleep (120000l);
}catch(Exception e){
throw new RuntimeException(e);
}
}
public void pollFormDef() throws Exception{
ConsumerRecords<Long, String> records =formDefConsumer.poll(0);
if(!records.isEmpty()){
int recordsCount=records.count();
if(logger.isDebugEnabled()){
logger.debug(" form-def consumer poll records size "+recordsCount);
}
if(records.count()>1){
logger.warn(" form-def consumer poll returned records more than 1 , expected 1 , received "+recordsCount);
}
ConsumerRecord<Long,String> record= records.iterator().next();
processFormDef(record.key(), record.value());
}
}
void pollFormData() throws Exception{
ConsumerRecords<Long, String> records =formDataConsumer.poll(0);
if(!records.isEmpty()){
int recordsCount=records.count();
if(logger.isDebugEnabled()){
logger.debug(" form-data consumer poll records size "+recordsCount);
}
if(records.count()>1){
logger.warn(" form-data consumer poll returned records more than 1 , expected 1 , received "+recordsCount);
} ConsumerRecord<Long,String> record= records.iterator().next();
processFormData(record.key(), record.value());
}
}
void processFormDef(Long key, String msg) throws Exception{
if(logger.isDebugEnabled()){
logger.debug(" key "+key+" payload : "+msg);
}
FormDefinition formDefinition= mapper.readValue(msg, FormDefinition.class);
formSink.createFromDef(formDefinition);
logger.debug(" processed message, key: "+key+ " msg : "+msg);
Thread.sleep(60000l);
}
void processFormData(Long key, String msg) throws Exception{
if(logger.isDebugEnabled()){
logger.debug(" key "+key+" payload : "+msg);
}
FormData formData= mapper.readValue(msg, FormData.class);
formSink.persists(formData);
logger.debug(" processed message, key: "+key+ " msg : "+msg);
Thread.sleep(60000l);
}
}