Spring Reactive с MongoDB дает сбой для более 1000 записей - PullRequest
0 голосов
/ 08 ноября 2018

У меня есть две службы Spring Boot, первая читает строку за строкой, преобразовывая ее в поток, и выполняет запрос POST:

webClient.post()
            .uri("/foobar/bulk")
            .contentType(APPLICATION_STREAM_JSON)
            .body(createFlux(), Foobar.class)
            .retrieve()
            .bodyToMono(Void.class)
            .subscribe();

Второй сервис получает Flux<Foobar> и сохраняет его в базе данных:

@ResponseStatus(HttpStatus.CREATED)
@PostMapping(value = "/foobar/bulk", consumes = APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> bulkInsert(@RequestBody Flux<Foobar> foobars) {
    return foobarReactiveRepository.insert(foobars).then();
}

Но в mongo db сохраняются только ~ 1000 объектов, а затем происходит сбой (в первом сервисе) с помощью:

reactor.core.Exceptions$ErrorCallbackNotImplemented: 

org.springframework.web.reactive.function.client.WebClientResponseException$InternalServerError: 500 Internal Server Error
Caused by: org.springframework.web.reactive.function.client.WebClientResponseException$InternalServerError: 500 Internal Server Error
    at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:151) ~[spring-webflux-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultResponseSpec.lambda$createResponseException$7(DefaultWebClient.java:466) ~[spring-webflux-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:118) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:378) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:202) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:343) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:325) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:372) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:522) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]

Не уверен, что это важно, но во втором журнале сервисов я вижу около 10 строк вроде:

[ntLoopGroup-2-7] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:7, serverValue:1424}] to localhost:27017

Если я увеличу maxPoolSize param для spring.data.mongodb.uri, я получу больше объектов в базу данных и больше журналов, как указано выше.

Я использую

  • spring-boot-starter-webflux:2.1.0.RELEASE'
  • spring-boot-starter-data-mongodb-reactive:2.1.0.RELEASE

Я что-то неправильно настроил или неправильно использую mongo / реактивный API?

1 Ответ

0 голосов
/ 09 ноября 2018

фактически все данные были вставлены в базу данных, но с ошибкой, которую я описал. Я немного изменил код и теперь все в порядке:

 webClient.post()
            .uri("/foobar/bulk")
            .contentType(APPLICATION_STREAM_JSON)
            .body(createFlux(), Foobar.class)
            .retrieve()
            .bodyToMono(Long.class)
            .subscribe(count -> log.info("items sent and received: " + count));

И получатель:

@PostMapping(value = "/bulk", consumes = APPLICATION_STREAM_JSON_VALUE)
public Mono<Long> bulkInsert(@RequestBody Flux<Foobar> foobars) {
    return foobarReactiveRepository.insert(foobars).count();
}
...