У меня есть задача Spring Scheduled для вставки или обновления строк json для обеспечения кэшей запросов. Недавно я использовал многопоточность для повышения эффективности, но обнаружил CannotAcquireLockException . Я использую Spring + MyBatis рамки.
Я искал причину: некоторые люди говорят, что использование службы в другом сервисе может вызвать смертельные блокировки и вызвать такое исключение, другие говорят, что никакой индекс в таблице не может получить тайм-аут, когда вставка или обновление данных, таким образом, получают исключение. Объяснение, которому я доверяю больше всего, это мое использование ON DUPLICATE KEY UPDATE
в MySQL. Когда транзакция начинается, находит дубликаты и получает последние данные, она получает блокировку (я не знаю, какой это тип блокировки). Предположим, что другая транзакция также найдет эту строку и получит блокировку. Теперь при обновлении строки первая транзакция требует снятия блокировки последней, последняя также нуждается в разблокировке первой. Таким образом, возникает CannotAcquireLockException.
Но я использую ConcurrentLinkedQueue , чтобы сохранить индекс целевых данных и убедиться, что он не может получить одну и ту же строку в одно и то же время, почему по-прежнему возникает исключение? Или я ошибаюсь?
Задача:
@Scheduled(cron = "0 0/5 * * * ?")
public void executeJob() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
logger.info("@@@@@@@@@@start[" + sdf.format(new Date()) + "]@@@@@@@@@@");
Queue<CacheCondition> conditions = new ConcurrentLinkedQueue<>();
List<Integer> projectIds = this.getProjectListService().selectProjectIds();
for (String timeStr : TIME_LIST) {
for (Integer pid : projectIds) {
List<Integer> themeIds = this.getThemeService().selectThemeIds(pid);
themeIds.add(0, 0);
for (Integer tid : themeIds) {
conditions.offer(new CacheCondition(tid, timeStr, pid)); // (push)
}
}
}
for (int i = 0; i < 10; ++i) {
new Thread(new SubSyncIndexResultTask(conditions, projectNameDataService, indexCacheService)).start();
}
}
подзадачи (Thread):
@Override
public void run() {
while (conditions.size() > 0) {
logger.info("^^^^^^^^^^subTask runs^^^^^^^^^^");
CacheCondition cc = conditions.poll(); // (pop)
Integer tid = cc.getTid();
String timeStr = cc.getTimeStr();
Integer pid = cc.getPid();
Map<String, List<Map<String, Object>>> statistics = this.getProjectNameDataService()
.getIndexStatistics(tid, timeStr, pid);
this.getIndexCacheService().insertOrUpdateJson(pid, tid, timeStr, JSON.toJSONString(statistics));
logger.info("pid = " + pid + ", tid = " + tid + ", timeStr = " + timeStr);
}
}
Услуги:
public int insertOrUpdateJson(Integer projectId, Integer themeId, String timeString, String resultJson) {
resultJson.replace("'", "\\'");
StringBuffer sqlBuffer = new StringBuffer("INSERT INTO index_cache(project_id, theme_id, time_string, " +
"result_json, update_time) VALUES(").append(projectId).append(", ").append(themeId).append(", ")
.append("'").append(timeString).append("', '").append(resultJson).append("', NOW())")
.append(" ON DUPLICATE KEY UPDATE result_json = '").append(resultJson).append("', update_time")
.append(" = NOW()");
return updateJson(sqlBuffer.toString());
}