Я использую Spring Boot для создания приложения обработки данных по расписанию. Основной лог c будет в запланированном задании, которое принимает пакет записей и обрабатывает их. Я должен запустить 2 экземпляра приложения, которые не должны выбирать одну и ту же запись дважды. Я пытался использовать PESSIMISTI C LOCK без ожидания, чтобы разрешить любой конфликт выбора записей. Вещи не работают, как ожидалось. Оба экземпляра выбирают одни и те же записи, хотя я ожидал, что только один экземпляр заблокирует и обработает несколько записей, а другой пропустит то, что было заблокировано первым экземпляром. Версия Spring Boot: 2.2.4.RELEASE
База данных: MySQl
Сначала я попробовал использовать аннотации @Lock и @QueryHint:
@Lock(value = LockModeType.PESSIMISTIC_WRITE) // adds 'FOR UPDATE' statement
@QueryHints(value={@QueryHint(name = "javax.persistence.lock.timeout", value = LockOptions.SKIP_LOCKED+"")})
Page<Transaction> findByStatus(String status, Pageable pageable);
Даже с WAIT_FOREVER , в поведении нет никаких изменений, как будто @QueryHints полностью игнорируются. Другой вариант, который я пробовал, использует NativeQuery:
@Query(value ="select * from transaction t where t.status = ?1 limit ?2 for update SKIP LOCKED",
countQuery="select count(*) from transaction t where t.status = ?1",
nativeQuery = true)
List<Transaction> findByStatusNQ(String status, Integer pageSize);
То же поведение. Без блокировки, оба экземпляра приложения выбирают один и тот же набор данных. Это определенная сущность:
@Entity
public class Transaction {
@Id
private Long id;
private String description;
private String status;
private String managedBy;
@Temporal(TemporalType.TIMESTAMP)
private Date manageDate;
...
}
Компонент службы вызывающей стороны помечен @Transactional для принудительного создания новой транзакции для каждого выполнения:
@Transactional(propagation = Propagation.REQUIRES_NEW)
public List<Transaction> updateTrxStatus(String oldStatus,String newStatus){
List<Transaction> trxs = this.executeUsingNQ(oldStatus);
if(trxs.size()>0) {
logger.info( "Start updating Data");
trxs.forEach(transaction -> {
transaction.setStatus(newStatus);
transaction.setManagedBy(instanceName);
transaction.setManageDate(new Date(System.currentTimeMillis()));
});
}else{
logger.info(" Nothing to process");
}
return trxs;
}
@Transactional(propagation = Propagation.REQUIRED)
public List<Transaction> executeUsingNQ(String oldStatus){
List<Transaction> trxs = trxRepo.findByStatusNQ(oldStatus,2);
return trxs;
}
@Transactional(propagation = Propagation.REQUIRED)
public List<Transaction> executeWithPage(String oldStatus){
Pageable firstPageWithTwoElements = PageRequest.of(0, 2);
Page<Transaction> trxs = trxRepo.findByStatus(oldStatus, firstPageWithTwoElements);
return trxs.getContent();
}
Надеюсь, кто-нибудь может помочь определить, есть ли какая-то проблема с кодированием или отсутствует конфигурация !!!!