Итак, есть внешний сервер (игра). Там есть рынок. Много продуктов и их комбинаций. Их общее число составляет 2146.
Я хочу получать актуальную информацию о ценах время от времени.
Когда приложение запускается, я создаю 2146 задач, каждая из которых отвечает за свой тип продукта. Задачи запускаются в отдельном потоке с задержкой 2,5 секунды.
@EventListener(ApplicationReadyEvent.class)
public void start() {
log.info("Let's get party started!");
Set<MarketplaceCollector> collectorSet = marketplaceCollectorProviders.stream()
.flatMap(provider -> provider.getCollectors().stream())
.peek(this::subscribeOfferDBSubscriber)
.collect(Collectors.toSet());
collectors.addAll(collectorSet);
runTasks();
}
private void subscribeOfferDBSubscriber(MarketplaceCollector marketplaceCollector) {
marketplaceCollector.subscribe(marketplaceOfferDBSubscriber);
}
private void runTasks() {
Thread thread = new Thread(() -> collectors.forEach(this::runWithDelay));
thread.setName("tasks-runner");
thread.start();
}
private void runWithDelay(Collector collector) {
try {
collector.collect();
Thread.sleep(2_500);
counter += 1;
} catch (InterruptedException e) {
log.error(e);
}
log.debug(counter);
}
Используя RestTemplate, я получаю доступ к серверу. Если цена изменилась, эта задача будет выполнена снова через 1 минуту. Если цена остается прежней, добавьте одну минуту к ожиданию и снова сделайте запрос. Таким образом, если цена не изменится, максимальное время между запросами на один товар составит 20 минут. Я предполагаю, что мое приложение будет выполнять до 200 запросов в минуту, в противном случае я получу ошибку «слишком много запросов».
@Override
public void collect() {
executorService.schedule(new MarketplaceTask(), INIT_DELAY, MILLISECONDS);
}
private MarketplaceRequest request() {
return MarketplaceRequest.builder()
.country(country)
.industry(industry)
.quality(quality)
.orderBy(ASC)
.currentPage(1)
.build();
}
private class MarketplaceTask implements Runnable {
private long MIN_DELAY = 60; // 1 minute
private long MAX_DELAY = 1200; // 20 minutes
private Double PREVIOUS_PRICE = Double.MAX_VALUE;
private long DELAY = 0; // seconds
@Override
public void run() {
log.debug(format("Collecting offer of %s %s in %s after %d m delay", industry, quality, country, DELAY / 60));
MarketplaceResponse response = marketplaceClient.getOffers(request());
subscribers.forEach(s -> s.onSubscription(response));
updatePreviousPriceAndPeriod(response);
executorService.schedule(this, DELAY, SECONDS);
}
private void updatePreviousPriceAndPeriod(MarketplaceResponse response) {
if (response.isError() || response.getOffers().isEmpty()) {
increasePeriod();
} else {
Double currentPrice = response.getOffers().get(0).getPriceWithTaxes();
if (isPriceChanged(currentPrice)) {
setMinimalDelay();
PREVIOUS_PRICE = currentPrice;
} else {
increasePeriod();
}
}
}
private void increasePeriod() {
if (DELAY < MAX_DELAY) {
DELAY += 60;
}
}
private boolean isPriceChanged(Double currentPrice) {
return !Objects.equals(currentPrice, PREVIOUS_PRICE);
}
private void setMinimalDelay() {
DELAY = MIN_DELAY;
}
}
public MarketplaceClient(@Value("${app.host}") String host,
AuthenticationService authenticationService,
RestTemplateBuilder restTemplateBuilder,
CommonHeadersComposer headersComposer) {
this.host = host;
this.authenticationService = authenticationService;
this.restTemplateList = restTemplateBuilder.build();
this.headersComposer = headersComposer;
}
public MarketplaceResponse getOffers(MarketplaceRequest request) {
var authentication = authenticationService.getAuthentication();
var requestEntity = new HttpEntity<>(requestString(request, authentication), headersComposer.getHeaders());
log.debug(message("PING for", request));
var responseEntity = restTemplate.exchange(host + MARKET_URL, POST, requestEntity, MarketplaceResponse.class);
log.debug(message("PONG for", request));
if (responseEntity.getBody().isError()) {
log.warn("{}: {} {} in {}", responseEntity.getBody().getMessage(), request.getIndustry(), request.getQuality(), request.getCountry());
}
return responseEntity.getBody();
}
private String requestString(MarketplaceRequest request, Authentication authentication) {
return format("countryId=%s&industryId=%s&quality=%s&orderBy=%s¤tPage=%s&ajaxMarket=1&_token=%s",
request.getCountry().getId(), request.getIndustry().getId(), request.getQuality().getValue(),
request.getOrderBy().getValue(), request.getCurrentPage(), authentication.getToken());
}
Но у меня есть проблема после нескольких минут приложения , Некоторые задачи перестают волновать. Запрос может go на сервер и не возвращаться. Однако другие задачи работают без проблем. Журналы, как он ведет себя (например):
2020-04-04 14:11:58.267 INFO 3546 --- [ main] c.g.d.e.harvesting.CollectorManager : Let's get party started!
2020-04-04 14:11:58.302 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.harvesting.MarketplaceCollector : Collecting offer of WEAPONS Q5 in GREECE after 0 m delay
2020-04-04 14:11:58.379 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient : PING for: WEAPONS Q5 in GREECE
2020-04-04 14:11:59.217 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient : PONG for: WEAPONS Q5 in GREECE
2020-04-04 14:12:00.805 DEBUG 3546 --- [ tasks-runner] c.g.d.e.harvesting.CollectorManager : 1
2020-04-04 14:12:00.806 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.harvesting.MarketplaceCollector : Collecting offer of WEAPONS Q4 in PAKISTAN after 0 m delay
2020-04-04 14:12:00.807 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient : PING for: WEAPONS Q4 in PAKISTAN
2020-04-04 14:12:03.308 DEBUG 3546 --- [ tasks-runner] c.g.d.e.harvesting.CollectorManager : 2
2020-04-04 14:12:03.309 DEBUG 3546 --- [pool-1-thread-2] c.g.d.e.harvesting.MarketplaceCollector : Collecting offer of FOOD_RAW Q1 in SAUDI_ARABIA after 0 m delay
2020-04-04 14:12:03.311 DEBUG 3546 --- [pool-1-thread-2] c.g.d.e.market.api.MarketplaceClient : PING for: FOOD_RAW Q1 in SAUDI_ARABIA
2020-04-04 14:12:05.810 DEBUG 3546 --- [ tasks-runner] c.g.d.e.harvesting.CollectorManager : 3
2020-04-04 14:12:05.810 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.harvesting.MarketplaceCollector : Collecting offer of WEAPONS Q5 in COLOMBIA after 0 m delay
2020-04-04 14:12:05.811 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient : PING for: WEAPONS Q5 in COLOMBIA
2020-04-04 14:12:08.314 DEBUG 3546 --- [ tasks-runner] c.g.d.e.harvesting.CollectorManager : 4
2020-04-04 14:12:08.315 DEBUG 3546 --- [pool-1-thread-4] c.g.d.e.harvesting.MarketplaceCollector : Collecting offer of WEAPONS Q1 in CZECH_REPUBLIC after 0 m delay
2020-04-04 14:12:08.316 DEBUG 3546 --- [pool-1-thread-4] c.g.d.e.market.api.MarketplaceClient : PING for: WEAPONS Q1 in CZECH_REPUBLIC
2020-04-04 14:12:10.818 DEBUG 3546 --- [ tasks-runner] c.g.d.e.harvesting.CollectorManager : 5
@Configuration
public class BeanConfiguration {
@Bean
public ScheduledExecutorService scheduledExecutorService() {
return Executors.newScheduledThreadPool(8);
}
}
Я пытался изменить пул соединений для одного хоста, но я только сделал это хуже. Я даже создал 200 экземпляров RestTemplate, но со временем доступ к серверу прекратился.
Я бы не хотел использовать Spring Webflux для этой цели.
Что я должен сделать, чтобы сделать приложение работать как положено?