Я использую mongodb для своего проекта с открытым исходным кодом уже более года, и недавно я решил опробовать транзакции.После написания нескольких тестов для методов, использующих транзакции, я понял, что они выдают странные исключения, и я не могу понять, в чем проблема.Итак, у меня есть метод delete
, который использует пользовательский coroutine context
и mutex
:
open suspend fun delete(photoInfo: PhotoInfo): Boolean {
return withContext(coroutineContext) {
return@withContext mutex.withLock {
return@withLock deletePhotoInternalInTransaction(photoInfo)
}
}
}
Затем он вызывает метод , который выполняет некоторое удаление:
//FIXME: doesn't work in tests
//should be called from within locked mutex
private suspend fun deletePhotoInternalInTransaction(photoInfo: PhotoInfo): Boolean {
check(!photoInfo.isEmpty())
val transactionMono = template.inTransaction().execute { txTemplate ->
return@execute photoInfoDao.deleteById(photoInfo.photoId, txTemplate)
.flatMap { favouritedPhotoDao.deleteFavouriteByPhotoName(photoInfo.photoName, txTemplate) }
.flatMap { reportedPhotoDao.deleteReportByPhotoName(photoInfo.photoName, txTemplate) }
.flatMap { locationMapDao.deleteById(photoInfo.photoId, txTemplate) }
.flatMap { galleryPhotoDao.deleteByPhotoName(photoInfo.photoName, txTemplate) }
}.next()
return try {
transactionMono.awaitFirst()
true
} catch (error: Throwable) {
logger.error("Could not delete photo", error)
false
}
}
Здесь у меня есть пять операций, которые удаляют данные из пяти разных документов.Вот пример одной из операций:
open fun deleteById(photoId: Long, template: ReactiveMongoOperations = reactiveTemplate): Mono<Boolean> {
val query = Query()
.addCriteria(Criteria.where(PhotoInfo.Mongo.Field.PHOTO_ID).`is`(photoId))
return template.remove(query, PhotoInfo::class.java)
.map { deletionResult -> deletionResult.wasAcknowledged() }
.doOnError { error -> logger.error("DB error", error) }
.onErrorReturn(false)
}
Я хочу, чтобы эта операция завершилась неудачей, если одно из удалений завершилось неудачно, поэтому я использую транзакцию.
Затем у меня есть несколько тестов для обработчика, который использует этот delete
метод:
@Test
fun `photo should not be uploaded if could not enqueue static map downloading request`() {
val webClient = getWebTestClient()
val userId = "1234235236"
val token = "fwerwe"
runBlocking {
Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(false)
}
kotlin.run {
val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
val multipartData = createTestMultipartFile(PHOTO1, packet)
val content = webClient
.post()
.uri("/v1/api/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(multipartData))
.exchange()
.expectStatus().is5xxServerError
.expectBody()
val response = fromBodyContent<UploadPhotoResponse>(content)
assertEquals(ErrorCode.DatabaseError.value, response.errorCode)
assertEquals(0, findAllFiles().size)
runBlocking {
assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
}
}
}
@Test
fun `photo should not be uploaded when resizeAndSavePhotos throws an exception`() {
val webClient = getWebTestClient()
val userId = "1234235236"
val token = "fwerwe"
runBlocking {
Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)
Mockito.doThrow(IOException("BAM"))
.`when`(diskManipulationService).resizeAndSavePhotos(any(), any())
}
kotlin.run {
val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
val multipartData = createTestMultipartFile(PHOTO1, packet)
val content = webClient
.post()
.uri("/v1/api/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(multipartData))
.exchange()
.expectStatus().is5xxServerError
.expectBody()
val response = fromBodyContent<UploadPhotoResponse>(content)
assertEquals(ErrorCode.ServerResizeError.value, response.errorCode)
assertEquals(0, findAllFiles().size)
runBlocking {
assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
}
}
}
@Test
fun `photo should not be uploaded when copyDataBuffersToFile throws an exception`() {
val webClient = getWebTestClient()
val userId = "1234235236"
val token = "fwerwe"
runBlocking {
Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)
Mockito.doThrow(IOException("BAM"))
.`when`(diskManipulationService).copyDataBuffersToFile(Mockito.anyList(), any())
}
kotlin.run {
val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
val multipartData = createTestMultipartFile(PHOTO1, packet)
val content = webClient
.post()
.uri("/v1/api/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(multipartData))
.exchange()
.expectStatus().is5xxServerError
.expectBody()
val response = fromBodyContent<UploadPhotoResponse>(content)
assertEquals(ErrorCode.ServerDiskError.value, response.errorCode)
assertEquals(0, findAllFiles().size)
runBlocking {
assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
}
}
}
Обычно первый тест проходит:
и следующие два терпят неудачу со следующим исключением:
17:09:01.228 [Thread-17] ERROR com.kirakishou.photoexchange.database.dao.PhotoInfoDao - DB error
org.springframework.data.mongodb.UncategorizedMongoDbException: Command failed with error 24 (LockTimeout): 'Unable to acquire lock '{8368122972467948263: Database, 1450593944826866407}' within a max lock request timeout of '5ms' milliseconds.' on server 192.168.99.100:27017.
И затем:
Caused by: com.mongodb.MongoCommandException: Command failed with error 246 (SnapshotUnavailable): 'Unable to read from a snapshot due to pending collection catalog changes; please retry the operation. Snapshot timestamp is Timestamp(1545661357, 23). Collection minimum is Timestamp(1545661357, 24)' on server 192.168.99.100:27017.
И:
17:22:36.951 [Thread-16] WARN reactor.core.publisher.FluxUsingWhen - Async resource cleanup failed after cancel
com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Transaction 1 has been aborted.' on server 192.168.99.100:27017.
Иногда два из них проходят, и последнийодин сбой.
Похоже, что только первая транзакция прошла успешно, и любая следующая операция завершится неудачей, и я предполагаю, что причина в том, что я должен закрыть ее вручную(или ClientSession).Но я не могу найти информацию о том, как закрыть транзакции / сессии. Здесь - один из немногих примеров, которые я мог найти, где они используют транзакции с реактивным шаблоном, и я не вижу, чтобы они делали что-либо дополнительное, чтобы закрыть транзакцию / сессию.
Или, может быть, это из-за того, что я издеваюсь над методом создания исключения внутри транзакции?Может быть, это не закрывается в этом случае?