Как синхронизировать (блокировать) на объекте jpa? - PullRequest
0 голосов
/ 05 апреля 2019

Я пишу приложение, которое периодически извлекает новые строки (row.status == 'NEW') из таблицы базы данных, выполняет некоторую обработку каждой строки как сущность JPA, затем сохраняет строку обратно в базу данных со статусом =='ОБРАБОТАНО'.

Таблица БД:

ID | Status
1  | PROCESSED
2  | NEW
3  | NEW

Java-код: (с использованием среды загрузки Spring)

@Component
public class Processor {

   // an JPA repository for selecting Items
   @Autowired
   ItemRepository itemRepository;

   // a thread executor for submitting 
   ExecutorService executor = Executors.newSingleThreadExecutor();



   @Scheduled(fixed-rate=1000)
   void process() {
        List<Item> newItems = itemRepository.findByStatus('NEW');
        for(Item item : newItems) {
            // process each item asyncronously
            executor.submit(()-> {
                // do some processing on this item and update status.
                // THis is time consuming process, may take 4 or 5 seconds
                item.setStatus("PROCESSED");
                itemRepository.save(item);
            });
        }
   }

}

Проблема в том, что когда один элемент item1все еще обрабатывается в executor и не обновляется со статусом PROCESSED, в следующем раунде обработки он все еще будет выбран itemRepository.findByStatus('NEW').И он будет снова отправлен на обработку.

Как избежать такого случая?(кроме изменения fixed-rate на fixed-delay) Существует ли какой-то механизм блокировки, такой как syncronize (item) { .... }, такой, что, когда строка базы данных еще обрабатывается, она не выбирается снова в следующем раунде метода process()?

Ответы [ 4 ]

0 голосов
/ 06 апреля 2019

С моей точки зрения, эта проблема может быть решена с помощью @Transactional уровня изоляции чтения без связи.Обратитесь к этому вопросу: Spring @Transactional - изоляция, распространение

Дополнительным примечанием является добавление другого статуса ON_PROCESS в качестве флага для элементов, обрабатываемых в данный момент другими потоками, которые будут сохранены перед выполнениемваша обработка.Если выдается исключение, оно автоматически откатывается, но в результате вы сохраняете его в ОБРАБОТАННО.Ключевым моментом здесь является то, что до тех пор, пока статус не НОВЫЙ, он не будет выбран вашей запланированной задачей, пока вы читаете незафиксированные статусы.

0 голосов
/ 06 апреля 2019

Рассматривали ли вы иметь третий статус в вашем статусе? то есть PROCESSING - это может быть простой способ убедиться, что у вас нет двух потоков, пытающихся обработать один и тот же элемент, причем каждый поток только набирает NEW работы.

Я сделал нечто подобное, за исключением того, что используемый объект STATUS - это просто строковое поле. Чтобы зарезервировать работу, она становится UPDATE TOP 1 FROM table set status = status + :randomString WHERE status = 'NEW', а затем снова выберите ее, чтобы начать обработку.

0 голосов
/ 06 апреля 2019

Вам нужна структура данных бухгалтерия , чтобы отслеживать задачи, которые были отправлены исполнителю.Вы можете ввести новое состояние в объекте Item для отслеживания этого, но, учитывая частоту планирования и количество элементов, этот подход вводит множество отключений базы данных, которые могут снизить производительность.

ИспользованиеConcurrentHashMap для отслеживания Items, которые были отправлены исполнителю путем помещения идентификатора Item в карту.После сохранения Item удалите идентификатор Item с карты.Эта карта поможет вам быстро решить, следует ли отправлять Item исполнителю или нет.

Если элементы, возвращаемые методом findByStatus, огромны, вы можете использовать Redis * 1016.* или Memcached для отслеживания уже отправленных элементов.

0 голосов
/ 06 апреля 2019

Не думаю, что это легко сделать с помощью планировщика Spring. Также, если вы можете найти решение с одним экземпляром с некоторой синхронизацией в одной и той же JVM, это не удастся, если в кластере с несколькими JVM запущено несколько экземпляров. Вы можете перейти к Quartz , который может использовать базу данных (JDBC), чтобы одновременно выполнять только один экземпляр задания. Реализуйте org.springframework.scheduling.quartz.QuartzJobBean и добавьте его в настройку Spring.

Выполните поиск для spring boot 2 Quartz, как настроить это. Это заняло бы слишком много места, но это не так сложно. Начать можно с документации Spring .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...