Остановить прослушивание Spring Cloud Stream @StreamListener до получения какого-либо события Spring - PullRequest
0 голосов
/ 30 августа 2018

Я работаю над Camunda BPM Spring Boot Application. Приложение читает сообщения из очереди rabbitmq, используя Spring Cloud Stream. Как только сообщение получено, приложение вызывает экземпляр процесса в Камунде.

Если во время запуска приложения уже есть сообщения в очереди rabbitmq, прослушиватель облачного потока начинает читать сообщения еще до инициализации Camunda.

Можно ли остановить прослушиватель облачного потока от прослушивания очереди, пока не произойдет какое-либо событие - в данном случае PostDeployEvent.

Я создал образец приложения для справки. https://github.com/kpkurian/spring-cloud-stream-camunda

Спасибо !!

Ответы [ 2 ]

0 голосов
/ 23 октября 2018

Spring cloud stream (kafka binder) добавлен метод паузы и возобновления работы потребителя

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        System.out.println(in);
        consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
    }

    @Bean
    public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
        return event -> {
            System.out.println(event);
            if (event.getConsumer().paused().size() > 0) {
                event.getConsumer().resume(event.getConsumer().paused());
            }
        };
    }
}

пожалуйста, проверьте документы https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples

но я думаю, что есть некоторая проблема с методом паузы: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/479

PS / Вы можете получить идентификатор раздела и название темы в примере прослушивателя:

  @StreamListener(Sink.INPUT)
  public void in(String in,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
    System.out.println(in);
    TopicPartition p = new TopicPartition(topic, partition);
    consumer.pause(Collections.singleton(p));
  }

или в errorChannel global Listener

   @StreamListener("errorChannel")
  public void errorGlobal(Message<?> message) {
    Message<?> failedMessage = ((ErrorMessage)message).getOriginalMessage();
    Consumer consumer = (Consumer)failedMessage.getHeaders().get(KafkaHeaders.CONSUMER);
    int partition = (int) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
    String topic = (String) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
    TopicPartition p = new TopicPartition(topic, partition);
    // ?
    consumer.pause(Collections.singleton(p));
}
0 голосов
/ 17 сентября 2018

По предложению @ OlegZhurakousky

Выпуск

RuntimeService автоматически подключается, и к моменту запуска приложения предполагается, что все службы, компоненты и т. Д. Полностью инициализированы. Если он все еще проходит процесс инициализации и запуска, то он не реализован должным образом с точки зрения Spring идиом.

Решение

Оберните RuntimeService пользовательской реализацией жизненного цикла, которая не будет возвращаться до тех пор, пока не будет выполнен его метод start (), гарантирующий, что RuntmeService готов к работе.

Я реализовал это в примере приложения github

...