Spring boot многопоточный asyn c не работает - PullRequest
0 голосов
/ 03 февраля 2020

Задача - вызвать базу данных, получить обновления определенных записей и сохранить их. Поскольку количество записей достаточно велико, мы хотим сделать это Asyn c, однако, это, кажется, не реализовано правильно.

Основной класс:

@SpringBootApplication
@EnableAsync
MainApplication() {

    @Bean("threadPoolExecutor")
    public TaskExecutor getAsyncExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
            executor.setMaxPoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.setThreadNamePrefix("RetryEnhancement-");
            return executor;
        }
    }

Метод в первом сервисе:

@Service
public class FirstService() {
    @Transactional
    public void fullProcess() {
        for(int counter = 0; counter < ConfigFile.getTHREADS(); counter++){
            secondaryService.threads();
         }
    }
}

метод во втором сервисе:

@Service
public class SecondService () {
    @Async("threadPoolExecutor")
    public void threads() {
        while(thirdService.threadMethod()) {
            //doNothing
        }
    }
}

метод в третьем сервисе:

@Service
public class ThirdService() {
    @Transactional
    public boolean threads() {
        Record record = repository.fetchRecord();
        if(record!=null) {
            updateRecord(record);
            saveRecord(record);
            return true;
        } else {
            return false;
        }
    }
}

Репозиторий:

public interface repository extends CrudRepository<Record, long> {
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Record fetchRecord();
}

Проблема, которую я обнаружил, заключается в том, что, хотя код выполняется отлично, похоже, он выполняет синхронное выполнение (обнаруживается путем добавления .sleep и отслеживания выполнения в регистраторе). Отдельные потоки, кажется, ожидают, пока другой не будет выполнен. Возможно, я делаю что-то не так, и если другой поток уже объясняет проблему, чем, пожалуйста, обратитесь к ней, хотя я не смог найти эту проблему в другом потоке.

1 Ответ

1 голос
/ 03 февраля 2020

Ваше решение является сложным. Отбросьте все это и просто введите TaskExecutor и выполните updateRecord в отдельном потоке (вам может потребоваться восстановить его, так как вы сейчас используете другой поток и, следовательно, соединение.

Примерно так следует выполнить трюк

private final TaskExecutor executor; // injected through constructor

public void process() {
  Stream<Record> records = repository.fetchRecords(); // Using a stream gives you a lazy cursor!
  records.forEach(this::processRecord);
}

private void processRecord(Record record) {
  executor.submit({
    updateRecord(record);
    saveRecord(record);
  });
}

Возможно, вы захотите поместить processRecord в другой объект и сделать его @Transactional или обернуть его в TransactionTemplate, чтобы получить такое поведение.

...