Выход из пулов с использованием CompleteableFuture и Spring Transaction - PullRequest
0 голосов
/ 10 декабря 2018

Я пытаюсь быстро загрузить базу данных в память, используя CompleteableFutures.Я запускаю транзакцию Spring на уровне метода:

@Transactional()
    private void loadErUp() {
        StopWatch sw = StopWatch.createStarted();
        List<CompletableFuture<Void>> calls = new ArrayList<>();
        final ZonedDateTime zdt = ZonedDateTime.now(ZoneId.of(ZoneOffset.UTC.getId())).minusMinutes(REFRESH_OVERLAP);

        for (long i = 1; i < 12 + 1; i++) {
            Long holder = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                this.loadPartition(holder, zdt);
            }, this.forkJoinPool);
            calls.add(future);
        }
        CompletableFuture.allOf(calls.toArray(new CompletableFuture[0])).join();
        log.info("All data refreshed in ({}ms) since:{}", sw.getTime(), zdt.format(DateTimeFormatter.ISO_INSTANT));
    }

, а затем присоединяю каждый поток к основной транзакции через

TransactionSynchronizationManager.setActualTransactionActive (true);

private <T> long loadPartition(long partitionKey, ZonedDateTime zdt) {
        log.debug("Refresh thread start:{}", partitionKey);
        TransactionSynchronizationManager.setActualTransactionActive(true);
        StopWatch sw = StopWatch.createStarted();

        try (Stream<Authority> authorityStream = aSqlRepo.findByPartitionKeyAndLastUpdatedTimeStampAfter(partitionKey, zdt)) {
            long count = authorityStream.peek(a -> {
                this.authorityRepository.set(this.GSON.fromJson(a.getJsonData(), AssetAuthority.class));
            }).count();
            log.info("Partition {} refreshed in ({}ms) with {} items.", partitionKey, sw.getTime(), count);
            return count;
        }
    }

Таким образом, я запускаю это пакетное задание каждые 30 секунд, и в 9-м запуске я получаю 4 потока, а затем он зависает (12 * 8 работает = 96), потому что он ждет открытия пула.Я получаю:

Невозможно получить соединение JDBC;Невозможно получить соединение в течение 30 секунд, недоступно [размер: 100;занят: 100;холостой ход: 0;lastwait: 30000].

Итак, очевидно, что соединения не фиксируются.Я подумал, что это может быть потому, что у меня есть свой собственный ForkJoinPool, однако я отключил все эти потоки, и это, похоже, не помогло.Я также поместил другой метод в метод loadPartition (), но, похоже, он тоже не помог.Есть еще один поток, в котором говорится о том, как заставить транзакции работать, но мои работы они просто не фиксируют.

1 Ответ

0 голосов
/ 14 декабря 2018

Если вы хотите, чтобы каждый #loadPartition выполнялся в своем собственном потоке и в своей собственной транзакции, вам необходимо:

  1. Пометить #loadPartition как @Transactional
  2. Вызовите метод прокси #loadPartition, чтобы @Transactional работал.Вы можете сделать это либо с помощью автоматического подключения, либо вызвав метод из другого прокси-класса

Транзакция не распространяется на асинхронные потоки, поскольку ( важно! ) этот метод не получает прокси .

Таким образом, он будет выглядеть так:

@Component
public class MyLoaderClass {

    // Autowire in this with constructor injection or @Autowired
    private MyLoaderClass myLoaderClass;

    // Removed @Transactional annotation
    public void loadErUp() {
        myLoaderClass.loadPartition(holder, zdt);
        ...
    }

    // 1) Add the @Transactional annotation to #loadPartition
    // 2) Make public to use self-autowiring (or refactored class, per link above)
    @Transactional
    public <T> long loadPartition(long partitionKey, ZonedDateTime zdt) {
        ...
        // Can remove TransactionSyncManager call
        ...
    }

}

Вы также захотите убедиться, что ваше пакетное задание не запускается, не убедившись, чтопоследняя работа завершена.Вы можете легко решить эту проблему с помощью , используя аннотацию @Scheduled для загрузки таблицы, чтобы прогоны не "перекрывались".

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...