Как ограничить запрос / секунду с WebClient? - PullRequest
0 голосов
/ 17 мая 2018

Я использую объект WebClient для отправки запроса Http Post на сервер. Он отправляет огромное количество запросов довольно быстро (в QueueChannel около 4000 сообщений). Проблема ... кажется, что сервер не может ответить достаточно быстро ... поэтому я получаю много ошибок 500 сервера и преждевременное закрытие соединения.

Есть ли способ ограничить количество запросов в секунду? Или ограничить количество потоков, которые он использует?

РЕДАКТИРОВАТЬ:

Сообщение обработки конечной точки сообщения в QueueChannel:

@MessageEndpoint
public class CustomServiceActivator {

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    IHttpService httpService;

    @ServiceActivator(
            inputChannel = "outputFilterChannel",
            outputChannel = "outputHttpServiceChannel",
            poller = @Poller( fixedDelay = "1000" )
    )
    public void processMessage(Data data) {
        httpService.push(data);
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

Класс обслуживания WebClient:

@Service
public class HttpService implements IHttpService {

    private static final String URL = "http://www.blabla.com/log";

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    WebClient webClient;

    @Override
    public void push(Data data) {
        String body = constructString(data);
        Mono<ResponseEntity<Response>> res = webClient.post()
                .uri(URL + getLogType(data))
                .contentLength(body.length())
                .contentType(MediaType.APPLICATION_JSON)
                .syncBody(body)
                .exchange()
                .flatMap(response -> response.toEntity(Response.class));

        res.subscribe(new Consumer<ResponseEntity<Response>>() { ... });
    }
}

Ответы [ 3 ]

0 голосов
/ 15 декабря 2018

Надеюсь, я не опаздываю на вечеринку. В любом случае, ограничение скорости запроса - лишь одна из проблем, с которыми я столкнулся неделю назад, когда создавал сканер. Вот вопросы:

  1. Я должен сделать рекурсивный, разбитый на страницы последовательный запрос. Параметры нумерации страниц включены в API, к которому я обращаюсь.
  2. После получения ответа сделайте паузу на 1 секунду перед выполнением следующего запроса.
  3. При определенных ошибках повторите попытку
  4. При повторной попытке сделать паузу на несколько секунд

Вот решение:

private Flux<HostListResponse> sequentialCrawl() {
    AtomicLong pageNo = new AtomicLong(2);
    // Solution for #1 - Flux.expand
    return getHosts(1)
        .doOnRequest(value -> LOGGER.info("Start crawling."))
        .expand(hostListResponse -> { 
            final long totalPages = hostListResponse.getData().getTotalPages();
            long currPageNo = pageNo.getAndIncrement();
            if (currPageNo <= totalPages) {
                LOGGER.info("Crawling page " + currPageNo + " of " + totalPages);
                // Solution for #2
                return Mono.just(1).delayElement(Duration.ofSeconds(1)).then(
                    getHosts(currPageNo)
                );
            }
            return Flux.empty();
        })
        .doOnComplete(() -> LOGGER.info("End of crawling."));
}

private Mono<HostListResponse> getHosts(long pageNo) {
    final String uri = hostListUrl + pageNo;
    LOGGER.info("Crawling " + uri);

    return webClient.get()
        .uri(uri)
        .exchange()
        // Solution for #3
        .retryWhen(companion -> companion
            .zipWith(Flux.range(1, RETRY + 1), (error, index) -> {
                String message = "Failed to crawl uri: " + error.getMessage();
                if (index <= RETRY && (error instanceof RequestIntervalTooShortException
                    || error instanceof ConnectTimeoutException
                    || "Connection reset by peer".equals(error.getMessage())
                )) {
                    LOGGER.info(message + ". Retries count: " + index);
                    return Tuples.of(error, index);
                } else {
                    LOGGER.warn(message);
                    throw Exceptions.propagate(error); //terminate the source with the 4th `onError`
                }
            })
            .map(tuple -> {
                // Solution for #4
                Throwable e = tuple.getT1();
                int delaySeconds = tuple.getT2();
                // TODO: Adjust these values according to your needs
                if (e instanceof ConnectTimeoutException) {
                    delaySeconds = delaySeconds * 5;
                } else if ("Connection reset by peer".equals(e.getMessage())) {
                    // The API that this app is calling will sometimes think that the requests are SPAM. So let's rest longer before retrying the request.
                    delaySeconds = delaySeconds * 10;
                }
                LOGGER.info("Will retry crawling after " + delaySeconds + " seconds to " + uri + ".");
                return Mono.delay(Duration.ofSeconds(delaySeconds));
            })
            .doOnNext(s -> LOGGER.warn("Request is too short - " + uri + ". Retried at " + LocalDateTime.now()))
        )
        .flatMap(clientResponse -> clientResponse.toEntity(String.class))
        .map(responseEntity -> {
            HttpStatus statusCode = responseEntity.getStatusCode();
            if (statusCode != HttpStatus.OK) {
                Throwable exception;
                // Convert json string to Java POJO
                HostListResponse response = toHostListResponse(uri, statusCode, responseEntity.getBody());
                // The API that I'm calling will return error code of 06 if request interval is too short
                if (statusCode == HttpStatus.BAD_REQUEST && "06".equals(response.getError().getCode())) {
                    exception = new RequestIntervalTooShortException(uri);
                } else {
                    exception = new IllegalStateException("Request to " + uri + " failed. Reason: " + responseEntity.getBody());
                }
                throw Exceptions.propagate(exception);
            } else {
                return toHostListResponse(uri, statusCode, responseEntity.getBody());
            }
        });
}
0 голосов
/ 10 августа 2019

Я использую это для ограничения количества активных запросов:

public DemoClass(WebClient.Builder webClientBuilder) {
    AtomicInteger activeRequest = new AtomicInteger();
    this.webClient = webClientBuilder
            .baseUrl("http://httpbin.org/ip")
            .filter(
                    (request, next) -> Mono.just(next)
                            .flatMap(a -> {
                                if (activeRequest.intValue() < 3) {
                                    activeRequest.incrementAndGet();
                                    return next.exchange(request)
                                            .doOnNext(b -> activeRequest.decrementAndGet());
                                }
                              return Mono.error(new RuntimeException("Too many requests"));
                            })
                            .retryWhen(Retry.anyOf(RuntimeException.class)
                                    .randomBackoff(Duration.ofMillis(300), Duration.ofMillis(1000))
                                    .retryMax(50)
                            )
            )
            .build();
}

public Mono<String> call() {
    return webClient.get()
            .retrieve()
            .bodyToMono(String.class);
}
0 голосов
/ 17 мая 2018

Вопрос Ограничение скорости запросов с Reactor предоставляет два ответа (один в комментарии)

zip с другим потоком, который действует как ограничитель скорости

.zipWith (Flux.interval (Duration.of (1, ChronoUnit.SECONDS)))

просто задержать каждый веб-запрос

использовать delayElements функция

edit: ответ ниже действителен для блокировки RestTemplate, но не очень хорошо вписывается в реактивную модель.

WebClient не имеет возможности ограничить запрос, но вы можете легко добавить эту функцию, используя композицию.

Вы можете ограничить ваш клиент извне, используя RateLimiter из Гуавы / (https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)

В этом уроке http://www.baeldung.com/guava-rate-limiter вы узнаете, как использовать ограничитель скорости блокирующим способом или с таймаутами.

Я бы украсил все вызовы, которые нужно регулировать, в отдельном классе,

  1. ограничивает количество вызовов в секунду
  2. выполняет реальный веб-вызов с использованием WebClient
...