Разделение сообщения потокового потока веб-клиента на массивы JSON - PullRequest
1 голос
/ 04 апреля 2019

Я использую сторонний REST-контроллер, который принимает массив объектов JSON и возвращает ответ одного объекта.Когда я POST из WebClient, используя ограниченный Flux код работает (я предполагаю, потому что Flux завершается).

Однако, когда Flux потенциально неограничен, как я;

  1. POST в кусках массивов?
  2. Захватить ответ для каждого POST-массива?
  3. Остановить передачу Flux?

Вот мой бин;

public class Car implements Serializable {

    Long id;

    public Car() {}
    public Car(Long id) { this.id = id; }
    public Long getId() {return id; }
    public void setId(Long id) { this.id = id; }
}

Вот как я предполагаю, что сторонний клиент выглядит так:

@RestController
public class ThirdPartyServer {

    @PostMapping("/cars")
    public CarResponse doCars(@RequestBody List<Car> cars) {
        System.err.println("Got " + cars);
        return new CarResponse("OK");
    }
}

А вот мой код.Когда я POST flux2, по завершении отправляется массив JSON.Однако, когда я POST flux1, после первого take(5) ничего не отправляется.Как сделать следующие куски 5?

@Component
public class MyCarClient {

    public void sendCars() {

//      Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));
        Flux<Car> flux2 = Flux.range(1, 10).map(i -> new Car((long) i));

        WebClient client = WebClient.create("http://localhost:8080");
        client
            .post()
            .uri("/cars")
            .contentType(MediaType.APPLICATION_JSON)
            .body(flux2, Car.class) 
//          .body(flux1.take(5).collectList(), new ParameterizedTypeReference<List<Car>>() {})
            .exchange()
            .subscribe(r -> System.err.println(r.statusCode()));
    }
}

1 Ответ

3 голосов
/ 05 апреля 2019
  1. Как я могу POST в кусках массивов?

Используйте один из вариантов Flux.window для разделения основного потокав оконные потоки, а затем отправлять запросы с помощью оконных потоков через .flatMap

        Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));

        WebClient client = WebClient.create("http://localhost:8080");
        Disposable disposable = flux1
                // 1
                .window(5)
                .flatMap(windowedFlux -> client
                        .post()
                        .uri("/cars")
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(windowedFlux, Car.class)
                        .exchange()
                        // 2
                        .doOnNext(response -> System.out.println(response.statusCode()))
                        .flatMap(response -> response.bodyToMono(...)))
                .subscribe();

        Thread.sleep(10000);

        // 3
        disposable.dispose();

Как мне захватить ответ для каждого POST-массива?

Вы можете проанализировать ответ через операторы после .exchange().

В приведенном мной примереответ можно увидеть в операторе doOnNext, но вы можете использовать любой оператор, который работает с onNext сигналами, например map или handle.

. Обязательно полностью прочитайте тело ответа.чтобы убедиться, что соединение возвращено обратно в пул (см. примечание ).Здесь я использовал .bodyToMono, но любой метод .body или .toEntity будет работать.

Остановить передачу Flux?

При использовании метода subscribe, как вы сделали, вы можете остановить поток, используя возвращенный disposable.dispose().

В качестве альтернативы, вы можете вернуть Flux из метода sendCars() и делегировать подписку и удаление вызывающей стороне.

Обратите внимание, что в приведенном мною примере я просто использовал Thread.sleep() для имитации ожидания.В реальном приложении вы должны использовать что-то более продвинутое и избегать Thread.sleep()

...