Как отправить сообщение в восходящий поток, используя реактивный поток / моно всякий раз, когда они готовы, чем опрос в интервале для статуса? - PullRequest
0 голосов
/ 12 января 2019

Попытка отправить сообщение в восходящий поток всякий раз, когда они доступны / готовы, и закрыть соединение после сброса, а не опрашивать сообщение, используя интервал реактивного потока пружины.

@GetMapping(value = "/getValue/{randomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) {

return Flux.<String>interval(Duration.ofSeconds(3))
                .map(status -> {
                    if (getSomething(randomId).
                            equalsIgnoreCase("value"))
                        return "value";
                    return "ping";
                }).take(Duration.ofSeconds(60)).timeout(Duration.ofSeconds(60));
    }

Слушатель Кафки обновляет значение randomId на карте по мере его получения, метод getSomething проверяет значение randomId через определенные интервалы на карте. Поэтому вместо того, чтобы проверять интервалы и сохранять данные на карте, я хочу отправить сообщение клиенту, когда получатель получит сообщение.

Ответы [ 2 ]

0 голосов
/ 12 февраля 2019

Я построил решение на основе этого стекового потока Spring 5 Web Reactive - Горячая публикация - Как использовать EmitterProcessor для соединения MessageListener с потоком событий , использовал EmitterProcessor для горячей публикации сообщений, когда они доступны.

Вот пример кода

@GetMapping(value = "/getValue/{randomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) {
    EmitterProcessor<String> emitterProcessor = EmitterProcessor.create();
    Flux<String> autoConnect = emitterProcessor.publish().autoConnect();
    FluxSink<String> sink = emitterProcessor.sink();
    //storing randomId and processor sink details
    randomIdMap.putIfAbsent(randomId, emitterProcessor);
    /** This will return ping status to notify client as 
    connection is alive until the randomId message received. **/
    sendPingStatus(sink, randomId);
}

Метод ниже показывает, как отправить сообщение клиенту, когда оно приходит на потребителя kafka и закрывает соединение потока.

@KafkaListener(topics = "some-subscription-id",
        containerFactory = "kafkaListenerContainerFactory")
public void pushMessage(SomeMessage message, Acknowledgment acknowledgment) {
    EmitterProcessor emitter = randomIdMap.get("randomId");
    if (emitter != null ) {
        emitter.onNext(message);
        emitter.onComplete();
        randomIdMap.remove("randomId");
        acknowledgment.acknowledge();
    }
}
0 голосов
/ 14 января 2019

Похоже на Flux.create() запрос:

return Flux.<String>create(emitter -> {
     if (getSomething(randomId).equalsIgnoreCase("value")) {
          sink.next("value");
     }
     else {
          sink.next("puing");
     }
  });

/**
 * Programmatically create a {@link Flux} with the capability of emitting multiple
 * elements in a synchronous or asynchronous manner through the {@link FluxSink} API.
 * This includes emitting elements from multiple threads.
 * <p>
 * <img class="marble" src="doc-files/marbles/createForFlux.svg" alt="">
 * <p>
 * This Flux factory is useful if one wants to adapt some other multi-valued async API
 * and not worry about cancellation and backpressure (which is handled by buffering
 * all signals if the downstream can't keep up).
 * <p>
 * For example:
 *
 * <pre><code>
 * Flux.&lt;String&gt;create(emitter -&gt; {
 *
 *     ActionListener al = e -&gt; {
 *         emitter.next(textField.getText());
 *     };
 *     // without cleanup support:
 *
 *     button.addActionListener(al);
 *
 *     // with cleanup support:
 *
 *     button.addActionListener(al);
 *     emitter.onDispose(() -> {
 *         button.removeListener(al);
 *     });
 * });
 * 
* * @ реактор.discard {@link FluxSink}, предоставляемый этим оператором, буферизует в случае * переполнение. Буфер отбрасывается, когда основная последовательность отменяется. * * @param Тип значений в последовательности * @param emitter Потребляет {@link FluxSink}, предоставляемый Reactor для каждого подписчика, для генерации сигналов. * @return a {@link Flux} * @see #push (Consumer) * / public static Flux create (Consumer <? super FluxSink <T>> эмиттер) {
...