Spring WebFlux: тестирование событий отправки сервером WebClient - PullRequest
2 голосов
/ 08 мая 2019

На самом деле я даже не уверен, возможно ли сделать то, что я хочу: я использую поток событий (Event.class) от брокера (MQTT), используя Spring Integration, и хочу перенаправить поток в другой микросервис.Оба сервиса должны быть горизонтально масштабируемыми.Таким образом, в другом сервисе у меня есть конечная точка POST, которая в качестве тела использует Flux<Event> (используя Spring Cloud Functions: spring-cloud-starter-function-webflux).В службе пересылки я хочу использовать Ленту и WebClient.

Что у меня есть (упрощенно):

@Configuration
public class EventFlowConfiguration{ 

@Autowired
private Webclient webClient;

@Bean
public MessageProducerSupport inboundAdapter() {
    final MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(MqttAsyncClient.generateClientId(), this.mqttClientFactory, "/topic");
    adapter.setConverter(new DefaultPahoMessageConverter());
    return adapter;
}

@Bean
Publisher<Message<Event>> eventFlow() {
    return IntegrationFlows
            .from(inboundAdapter())
            .handle((payload, header) -> this.eventHandler((String) payload))
            .toReactivePublisher();
}

private Event eventHandler(String payload) {
    // parsing, store in database ...
    return event;
}

@PostConstruct
public void setTrigger() {
    buildTrigger(webClient, "/receiveEventStream", eventFlow(), Event.class);
}

private <T, E extends Event> void buildTrigger(WebClient webClient, String uri, Publisher<Message<T>> publisher, Class<E> eventClass) {
    webClient
            .post()
            .uri(uri)
            .contentType(MediaType.APPLICATION_STREAM_JSON)
            .body(
                    Flux.from(publisher)
                            .retry()
                            .map(Message::getPayload)
                    ,
                    eventClass
            )
            .retrieve()
            .bodyToMono(Void.class)
            .retry()
            .subscribe();
}
}

Мои вопросы:

(1)Есть ли шанс проверить это вообще?Как бы я это сделал?Что касается OkHttp, WireMock, внутреннего тестового загрузочного приложения в тесте, которое имеет /eventReceiverEndpoint, но просто утверждает.Тем не менее, на самом деле не получил нигде.На самом деле я даже не получаю ошибку.Я бы ожидал 4xx, так как при запуске сервера no memeory нет.

(2) Два вызова retry() действительно повторяют проблемы соединения с обеих сторон?Я теряю сообщения в случае отключения?Как я могу это проверить?

(3) Может ли лента в сочетании с этим долгоживущим Flux все еще выполнять балансировку нагрузки после установления соединения (например, когда нагрузка конечного потребителя достигает высокой или около того)).Я полагаю, что это должно было бы автоматически повторно подключить Flux к другому экземпляру?

...