потребитель kafka выключает время выполнения, обрабатывает сообщения последовательно - PullRequest
0 голосов
/ 29 июня 2018

Мой слушатель kafka должен обрабатывать сообщения в последовательном порядке, метод onMessage должен обрабатывать сообщения синхронно, я не хочу, чтобы мой слушатель обрабатывал несколько сообщений одновременно, метод onmessage сначала останавливается

org.springframework.kafka.listener.MessageListenerContainer

затем передает полезную нагрузку синхронизированному методу, после завершения обработки запускает прослушиватель обратно. Другими вариантами выбора являются использование очереди блокировки, службы исполнителя и т. Д., Нужен совет по лучшей стратегии для достижения этой цели. Есть ли у потребителя kafka какие-либо функции, предназначенные для последовательной обработки сообщений? вот мой код.

Я изменил реализацию на этот

    public static class KafkaReadMsgTask implements  Runnable{

    @Override
    public void run() {
        KakfaMsgConumerImpl  kakfaMsgConumerImpl=null;;
        try{
            kakfaMsgConumerImpl=SpContext.getBean(KakfaMsgConumerImpl.class);
            kakfaMsgConumerImpl.pollFormDef();
            kakfaMsgConumerImpl.pollFormData();
      } catch (Exception e){
          logger.error(" kafka listener  errors "+e);
          kakfaMsgConumerImpl.pauseTask();
       }  
    }
}


    @Component
public static class KakfaMsgConumerImpl {

    @Autowired
    ObjectMapper  mapper;

    @Autowired
    FormSink  formSink;

    @Autowired
    Environment  env;

    @Resource(name="formDefConsumer")
    Consumer formDefConsumer;

    @Resource(name="formDataConsumer")
    Consumer formDataConsumer;

    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

    public void startPolling() throws Exception{
        executor.scheduleAtFixedRate(new KafkaReadMsgTask(),10, 3,TimeUnit.SECONDS);
    }
    public void pauseTask(){
        try{
            Thread.sleep (120000l);
        }catch(Exception e){
            throw new RuntimeException(e);
        }
    }
    public void  pollFormDef() throws Exception{
        ConsumerRecords<Long, String> records =formDefConsumer.poll(0);
        if(!records.isEmpty()){
            int recordsCount=records.count();
            if(logger.isDebugEnabled()){
                logger.debug(" form-def consumer poll records size "+recordsCount);
            }
            if(records.count()>1){
                logger.warn(" form-def consumer poll returned records more than 1 , expected 1 , received "+recordsCount);
            }
            ConsumerRecord<Long,String> record= records.iterator().next();
            processFormDef(record.key(), record.value());
        }
    }
    void pollFormData() throws Exception{
        ConsumerRecords<Long, String> records =formDataConsumer.poll(0);
        if(!records.isEmpty()){
            int recordsCount=records.count();
            if(logger.isDebugEnabled()){
                logger.debug(" form-data consumer poll records size "+recordsCount);
            }
            if(records.count()>1){
                logger.warn(" form-data consumer poll returned records more than 1 , expected 1 , received "+recordsCount);
            }               ConsumerRecord<Long,String> record= records.iterator().next();
            processFormData(record.key(), record.value());
        }
    }
    void processFormDef(Long key, String msg) throws  Exception{
        if(logger.isDebugEnabled()){
            logger.debug(" key "+key+" payload : "+msg);
        }
        FormDefinition  formDefinition= mapper.readValue(msg, FormDefinition.class);
        formSink.createFromDef(formDefinition);
        logger.debug(" processed  message,  key: "+key+ " msg : "+msg);
        Thread.sleep(60000l);
    }

    void processFormData(Long key, String msg) throws  Exception{
        if(logger.isDebugEnabled()){
            logger.debug(" key "+key+" payload : "+msg);
        }
        FormData  formData= mapper.readValue(msg, FormData.class);
        formSink.persists(formData);
        logger.debug(" processed  message,  key: "+key+ " msg : "+msg);
        Thread.sleep(60000l);
    }



}

1 Ответ

0 голосов
/ 29 июня 2018

Использование контейнера слушателя, управляемого сообщениями, не является подходящей технологией для этого приложения; похоже, что вы хотите получать сообщения попеременно из двух разных тем.

Кроме того, остановка контейнера в потоке потребителя в любом случае не вступит в силу, пока поток не выйдет из метода, и в этот момент потребитель будет закрыт.

Я бы предложил вам использовать фабрику потребителей для создания двух потребителей; подпишитесь на темы, установите max.poll.records для каждого в 1 и вызовите метод poll() для каждого поочередно.

...