Как отменить подписку на Реактивный поток в Vert.x - PullRequest
0 голосов
/ 09 октября 2019

Я использую ReactiveStreams для публикации событий SSE в Vert.x:

ReactiveStreams.fromPublisher(vertx.periodicStream(1000).toPublisher())
        .map(l -> String.format("Number of Customer added %s .%n",
                customerRepository.findAll().size() + " "))
        .buildRs();

Можно ли отменить подписку на потоковую передачу данных? Спасибо

1 Ответ

1 голос
/ 10 октября 2019

Метод buildRs() возвращает Publisher:

Publisher<String> publisher = ReactiveStreams.fromPublisher(vertx.periodicStream(1000).toPublisher())
  .map(l -> String.format("Number of Customer added %s .%n",
    customerRepository.findAll().size() + " "))
  .buildRs();

Когда вы подписываетесь на этот Publisher, вы можете сохранить ссылку на Subscription и затем отменить излучение, когда вы закончите. :

publisher
  .subscribe(new Subscriber<String>() {
    volatile Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
      this.subscription = subscription;
    }

    @Override
    public void onNext(String s) {
      // when no more event is needed
      subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
      // handle error
    }

    @Override
    public void onComplete() {
      // handle complete
    }
  });
...