Резюме потребителя кафки, не работающего в сценарии с несколькими связующими - PullRequest
0 голосов
/ 02 мая 2019

У меня есть требование приостановить работу потребителя Kafka, и после определенного интервала оно должно возобновиться. Приостановка работы потребителя Kafka работает, как и ожидалось, но проблема связана с Resume of Consumer.ListenerContainerIdleEvent не запускается после фиксированного интервала времени в сценарии с несколькими связывателями.Тот же код работает, если настроен только для одного подшивки.

Версия: Greenwich.SR1

Код реализации паузы:

private void pauseConsumer(ErrorMessage message)
      {
        Message<?> failedMessage = message.getOriginalMessage();
        Consumer consumer (Consumer)failedMessage.getHeaders().get("kafka_consumer");
        consumer.pause(consumer.assignment());
      }

Резюме Код реализации:

@Bean
    public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
        return event -> {
            System.out.println(event + " paused:" + event.getConsumer().paused());
            if (event.getConsumer().paused().size() > 0) {
                event.getConsumer().resume(event.getConsumer().paused());
            }
        };
    }

application.yml:

server:
  port: 7778
spring:
  cloud:
    stream:
      binders:
        kafka_binder:
          type: kafka
          environment:
            spring:
              cloud:
                stream: 
                  kafka:
                    binder:
                      brokers: localhost:9092
      bindings:
        input:  
          destination: EMPLOYEE-TOPIC-R1-P1
          binder: kafka_binder
          contentType: application/json
          group: so51247113

Ответы [ 2 ]

0 голосов
/ 02 мая 2019

У меня нет проблем с получением событий простоя при использовании именованного связывателя ...

spring.cloud.stream.binders.mykafka.type=kafka

spring.cloud.stream.bindings.input.binder=mykafka

и

    @StreamListener(Sink.INPUT)
    public void foo(String in) {    
        System.out.println(in);
    }

    @EventListener
    public void events(KafkaEvent event) {
        System.out.println(event);
    }

и

ListenerContainerIdleEvent [idleTime=30.017s, listenerId=[Ljava.lang.String;@2b214b94.container-0, container=KafkaMessageListenerContainer [id=[Ljava.lang.String;@2b214b94.container-0, clientIndex=-0, topicPartitions=[input-0]], paused=false, topicPartitions=[input-0]]
ListenerContainerIdleEvent [idleTime=60.037s, listenerId=[Ljava.lang.String;@2b214b94.container-0, container=KafkaMessageListenerContainer [id=[Ljava.lang.String;@2b214b94.container-0, clientIndex=-0, topicPartitions=[input-0]], paused=false, topicPartitions=[input-0]]
ListenerContainerIdleEvent [idleTime=90.056s, listenerId=[Ljava.lang.String;@2b214b94.container-0, container=KafkaMessageListenerContainer [id=[Ljava.lang.String;@2b214b94.container-0, clientIndex=-0, topicPartitions=[input-0]], paused=false, topicPartitions=[input-0]]
ListenerContainerIdleEvent [idleTime=120.072s, listenerId=[Ljava.lang.String;@2b214b94.container-0, container=KafkaMessageListenerContainer [id=[Ljava.lang.String;@2b214b94.container-0, clientIndex=-0, topicPartitions=[input-0]], paused=false, topicPartitions=[input-0]]
ListenerContainerIdleEvent [idleTime=150.091s, listenerId=[Ljava.lang.String;@2b214b94.container-0, container=KafkaMessageListenerContainer [id=[Ljava.lang.String;@2b214b94.container-0, clientIndex=-0, topicPartitions=[input-0]], paused=false, topicPartitions=[input-0]]
0 голосов
/ 02 мая 2019

Вам не нужно делать ничего из этого, поскольку framework предоставляет конечную точку для управления жизненным циклом каждой привязки, включая паузу / возобновление. Пожалуйста, обратитесь к этому разделу руководства пользователя. Как правило, вы можете вызвать паузу / возобновить вызов REST

curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...