Как лучше всего выполнять 200 запросов в минуту к внешнему API в многопоточном приложении Spring Boot? - PullRequest
0 голосов
/ 04 апреля 2020

Итак, есть внешний сервер (игра). Там есть рынок. Много продуктов и их комбинаций. Их общее число составляет 2146.

Я хочу получать актуальную информацию о ценах время от времени.

Когда приложение запускается, я создаю 2146 задач, каждая из которых отвечает за свой тип продукта. Задачи запускаются в отдельном потоке с задержкой 2,5 секунды.

    @EventListener(ApplicationReadyEvent.class)
    public void start() {
        log.info("Let's get party started!");
        Set<MarketplaceCollector> collectorSet = marketplaceCollectorProviders.stream()
                .flatMap(provider -> provider.getCollectors().stream())
                .peek(this::subscribeOfferDBSubscriber)
                .collect(Collectors.toSet());
        collectors.addAll(collectorSet);
        runTasks();
    }

    private void subscribeOfferDBSubscriber(MarketplaceCollector marketplaceCollector) {
        marketplaceCollector.subscribe(marketplaceOfferDBSubscriber);
    }

    private void runTasks() {
        Thread thread = new Thread(() -> collectors.forEach(this::runWithDelay));
        thread.setName("tasks-runner");
        thread.start();
    }

    private void runWithDelay(Collector collector) {
        try {
            collector.collect();
            Thread.sleep(2_500);
            counter += 1;
        } catch (InterruptedException e) {
            log.error(e);
        }
        log.debug(counter);
    }

Используя RestTemplate, я получаю доступ к серверу. Если цена изменилась, эта задача будет выполнена снова через 1 минуту. Если цена остается прежней, добавьте одну минуту к ожиданию и снова сделайте запрос. Таким образом, если цена не изменится, максимальное время между запросами на один товар составит 20 минут. Я предполагаю, что мое приложение будет выполнять до 200 запросов в минуту, в противном случае я получу ошибку «слишком много запросов».

    @Override
    public void collect() {
        executorService.schedule(new MarketplaceTask(), INIT_DELAY, MILLISECONDS);
    }

    private MarketplaceRequest request() {
        return MarketplaceRequest.builder()
                .country(country)
                .industry(industry)
                .quality(quality)
                .orderBy(ASC)
                .currentPage(1)
                .build();
    }

    private class MarketplaceTask implements Runnable {
        private long MIN_DELAY = 60; // 1 minute
        private long MAX_DELAY = 1200; // 20 minutes

        private Double PREVIOUS_PRICE = Double.MAX_VALUE;
        private long DELAY = 0; // seconds

        @Override
        public void run() {
            log.debug(format("Collecting offer of %s %s in %s after %d m delay", industry, quality, country, DELAY / 60));
            MarketplaceResponse response = marketplaceClient.getOffers(request());
            subscribers.forEach(s -> s.onSubscription(response));
            updatePreviousPriceAndPeriod(response);
            executorService.schedule(this, DELAY, SECONDS);
        }

        private void updatePreviousPriceAndPeriod(MarketplaceResponse response) {
            if (response.isError() || response.getOffers().isEmpty()) {
                increasePeriod();
            } else {
                Double currentPrice = response.getOffers().get(0).getPriceWithTaxes();
                if (isPriceChanged(currentPrice)) {
                    setMinimalDelay();
                    PREVIOUS_PRICE = currentPrice;
                } else {
                    increasePeriod();
                }
            }
        }

        private void increasePeriod() {
            if (DELAY < MAX_DELAY) {
                DELAY += 60;
            }
        }

        private boolean isPriceChanged(Double currentPrice) {
            return !Objects.equals(currentPrice, PREVIOUS_PRICE);
        }

        private void setMinimalDelay() {
            DELAY = MIN_DELAY;
        }
    }
    public MarketplaceClient(@Value("${app.host}") String host,
                             AuthenticationService authenticationService,
                             RestTemplateBuilder restTemplateBuilder,
                             CommonHeadersComposer headersComposer) {
        this.host = host;
        this.authenticationService = authenticationService;
        this.restTemplateList = restTemplateBuilder.build();
        this.headersComposer = headersComposer;
    }

    public MarketplaceResponse getOffers(MarketplaceRequest request) {
        var authentication = authenticationService.getAuthentication();
        var requestEntity = new HttpEntity<>(requestString(request, authentication), headersComposer.getHeaders());
        log.debug(message("PING for", request));
        var responseEntity = restTemplate.exchange(host + MARKET_URL, POST, requestEntity, MarketplaceResponse.class);
        log.debug(message("PONG for", request));
        if (responseEntity.getBody().isError()) {
            log.warn("{}: {} {} in {}", responseEntity.getBody().getMessage(), request.getIndustry(), request.getQuality(), request.getCountry());
        }
        return responseEntity.getBody();
    }

    private String requestString(MarketplaceRequest request, Authentication authentication) {
        return format("countryId=%s&industryId=%s&quality=%s&orderBy=%s&currentPage=%s&ajaxMarket=1&_token=%s",
            request.getCountry().getId(), request.getIndustry().getId(), request.getQuality().getValue(), 
            request.getOrderBy().getValue(), request.getCurrentPage(), authentication.getToken());
  }

Но у меня есть проблема после нескольких минут приложения , Некоторые задачи перестают волновать. Запрос может go на сервер и не возвращаться. Однако другие задачи работают без проблем. Журналы, как он ведет себя (например):

2020-04-04 14:11:58.267  INFO 3546 --- [           main] c.g.d.e.harvesting.CollectorManager      : Let's get party started!
2020-04-04 14:11:58.302 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.harvesting.MarketplaceCollector  : Collecting offer of WEAPONS Q5 in GREECE after 0 m delay
2020-04-04 14:11:58.379 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient     : PING for: WEAPONS Q5 in GREECE
2020-04-04 14:11:59.217 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient     : PONG for: WEAPONS Q5 in GREECE
2020-04-04 14:12:00.805 DEBUG 3546 --- [   tasks-runner] c.g.d.e.harvesting.CollectorManager      : 1
2020-04-04 14:12:00.806 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.harvesting.MarketplaceCollector  : Collecting offer of WEAPONS Q4 in PAKISTAN after 0 m delay
2020-04-04 14:12:00.807 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient     : PING for: WEAPONS Q4 in PAKISTAN
2020-04-04 14:12:03.308 DEBUG 3546 --- [   tasks-runner] c.g.d.e.harvesting.CollectorManager      : 2
2020-04-04 14:12:03.309 DEBUG 3546 --- [pool-1-thread-2] c.g.d.e.harvesting.MarketplaceCollector  : Collecting offer of FOOD_RAW Q1 in SAUDI_ARABIA after 0 m delay
2020-04-04 14:12:03.311 DEBUG 3546 --- [pool-1-thread-2] c.g.d.e.market.api.MarketplaceClient     : PING for: FOOD_RAW Q1 in SAUDI_ARABIA
2020-04-04 14:12:05.810 DEBUG 3546 --- [   tasks-runner] c.g.d.e.harvesting.CollectorManager      : 3
2020-04-04 14:12:05.810 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.harvesting.MarketplaceCollector  : Collecting offer of WEAPONS Q5 in COLOMBIA after 0 m delay
2020-04-04 14:12:05.811 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient     : PING for: WEAPONS Q5 in COLOMBIA
2020-04-04 14:12:08.314 DEBUG 3546 --- [   tasks-runner] c.g.d.e.harvesting.CollectorManager      : 4
2020-04-04 14:12:08.315 DEBUG 3546 --- [pool-1-thread-4] c.g.d.e.harvesting.MarketplaceCollector  : Collecting offer of WEAPONS Q1 in CZECH_REPUBLIC after 0 m delay
2020-04-04 14:12:08.316 DEBUG 3546 --- [pool-1-thread-4] c.g.d.e.market.api.MarketplaceClient     : PING for: WEAPONS Q1 in CZECH_REPUBLIC
2020-04-04 14:12:10.818 DEBUG 3546 --- [   tasks-runner] c.g.d.e.harvesting.CollectorManager      : 5
@Configuration
public class BeanConfiguration {

  @Bean
  public ScheduledExecutorService scheduledExecutorService() {
    return Executors.newScheduledThreadPool(8);
  }
}

Я пытался изменить пул соединений для одного хоста, но я только сделал это хуже. Я даже создал 200 экземпляров RestTemplate, но со временем доступ к серверу прекратился.

Я бы не хотел использовать Spring Webflux для этой цели.

Что я должен сделать, чтобы сделать приложение работать как положено?

...