MongoDB реактивные шаблонные транзакции - PullRequest
0 голосов
/ 24 декабря 2018

Я использую mongodb для своего проекта с открытым исходным кодом уже более года, и недавно я решил опробовать транзакции.После написания нескольких тестов для методов, использующих транзакции, я понял, что они выдают странные исключения, и я не могу понять, в чем проблема.Итак, у меня есть метод delete, который использует пользовательский coroutine context и mutex:

  open suspend fun delete(photoInfo: PhotoInfo): Boolean {
    return withContext(coroutineContext) {
      return@withContext mutex.withLock {
        return@withLock deletePhotoInternalInTransaction(photoInfo)
      }
    }
  }

Затем он вызывает метод , который выполняет некоторое удаление:

  //FIXME: doesn't work in tests
  //should be called from within locked mutex
  private suspend fun deletePhotoInternalInTransaction(photoInfo: PhotoInfo): Boolean {
    check(!photoInfo.isEmpty())

    val transactionMono = template.inTransaction().execute { txTemplate ->
      return@execute photoInfoDao.deleteById(photoInfo.photoId, txTemplate)
        .flatMap { favouritedPhotoDao.deleteFavouriteByPhotoName(photoInfo.photoName, txTemplate) }
        .flatMap { reportedPhotoDao.deleteReportByPhotoName(photoInfo.photoName, txTemplate) }
        .flatMap { locationMapDao.deleteById(photoInfo.photoId, txTemplate) }
        .flatMap { galleryPhotoDao.deleteByPhotoName(photoInfo.photoName, txTemplate) }
    }.next()

    return try {
      transactionMono.awaitFirst()
      true
    } catch (error: Throwable) {
      logger.error("Could not delete photo", error)
      false
    }
  }

Здесь у меня есть пять операций, которые удаляют данные из пяти разных документов.Вот пример одной из операций:

open fun deleteById(photoId: Long, template: ReactiveMongoOperations = reactiveTemplate): Mono<Boolean> {
    val query = Query()
      .addCriteria(Criteria.where(PhotoInfo.Mongo.Field.PHOTO_ID).`is`(photoId))

    return template.remove(query, PhotoInfo::class.java)
      .map { deletionResult -> deletionResult.wasAcknowledged() }
      .doOnError { error -> logger.error("DB error", error) }
      .onErrorReturn(false)
  }

Я хочу, чтобы эта операция завершилась неудачей, если одно из удалений завершилось неудачно, поэтому я использую транзакцию.

Затем у меня есть несколько тестов для обработчика, который использует этот delete метод:

  @Test
  fun `photo should not be uploaded if could not enqueue static map downloading request`() {
    val webClient = getWebTestClient()
    val userId = "1234235236"
    val token = "fwerwe"

    runBlocking {
      Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
      Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
      Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
      Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
      Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(false)
    }

    kotlin.run {
      val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
      val multipartData = createTestMultipartFile(PHOTO1, packet)

      val content = webClient
        .post()
        .uri("/v1/api/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(multipartData))
        .exchange()
        .expectStatus().is5xxServerError
        .expectBody()

      val response = fromBodyContent<UploadPhotoResponse>(content)
      assertEquals(ErrorCode.DatabaseError.value, response.errorCode)

      assertEquals(0, findAllFiles().size)

      runBlocking {
        assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
        assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
      }
    }
  }

  @Test
  fun `photo should not be uploaded when resizeAndSavePhotos throws an exception`() {
    val webClient = getWebTestClient()
    val userId = "1234235236"
    val token = "fwerwe"

    runBlocking {
      Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
      Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
      Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
      Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
      Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)

      Mockito.doThrow(IOException("BAM"))
        .`when`(diskManipulationService).resizeAndSavePhotos(any(), any())
    }

    kotlin.run {
      val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
      val multipartData = createTestMultipartFile(PHOTO1, packet)

      val content = webClient
        .post()
        .uri("/v1/api/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(multipartData))
        .exchange()
        .expectStatus().is5xxServerError
        .expectBody()

      val response = fromBodyContent<UploadPhotoResponse>(content)
      assertEquals(ErrorCode.ServerResizeError.value, response.errorCode)

      assertEquals(0, findAllFiles().size)

      runBlocking {
        assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
        assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
      }
    }
  }

  @Test
  fun `photo should not be uploaded when copyDataBuffersToFile throws an exception`() {
    val webClient = getWebTestClient()
    val userId = "1234235236"
    val token = "fwerwe"

    runBlocking {
      Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
      Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
      Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
      Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
      Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)

      Mockito.doThrow(IOException("BAM"))
        .`when`(diskManipulationService).copyDataBuffersToFile(Mockito.anyList(), any())
    }

    kotlin.run {
      val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
      val multipartData = createTestMultipartFile(PHOTO1, packet)

      val content = webClient
        .post()
        .uri("/v1/api/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(multipartData))
        .exchange()
        .expectStatus().is5xxServerError
        .expectBody()

      val response = fromBodyContent<UploadPhotoResponse>(content)
      assertEquals(ErrorCode.ServerDiskError.value, response.errorCode)

      assertEquals(0, findAllFiles().size)

      runBlocking {
        assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
        assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
      }
    }
  }

Обычно первый тест проходит:

enter image description here

и следующие два терпят неудачу со следующим исключением:

17:09:01.228 [Thread-17] ERROR com.kirakishou.photoexchange.database.dao.PhotoInfoDao - DB error
org.springframework.data.mongodb.UncategorizedMongoDbException: Command failed with error 24 (LockTimeout): 'Unable to acquire lock '{8368122972467948263: Database, 1450593944826866407}' within a max lock request timeout of '5ms' milliseconds.' on server 192.168.99.100:27017. 

И затем:

Caused by: com.mongodb.MongoCommandException: Command failed with error 246 (SnapshotUnavailable): 'Unable to read from a snapshot due to pending collection catalog changes; please retry the operation. Snapshot timestamp is Timestamp(1545661357, 23). Collection minimum is Timestamp(1545661357, 24)' on server 192.168.99.100:27017.

И:

17:22:36.951 [Thread-16] WARN  reactor.core.publisher.FluxUsingWhen - Async resource cleanup failed after cancel
com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Transaction 1 has been aborted.' on server 192.168.99.100:27017. 

Иногда два из них проходят, и последнийодин сбой.

enter image description here

Похоже, что только первая транзакция прошла успешно, и любая следующая операция завершится неудачей, и я предполагаю, что причина в том, что я должен закрыть ее вручную(или ClientSession).Но я не могу найти информацию о том, как закрыть транзакции / сессии. Здесь - один из немногих примеров, которые я мог найти, где они используют транзакции с реактивным шаблоном, и я не вижу, чтобы они делали что-либо дополнительное, чтобы закрыть транзакцию / сессию.

Или, может быть, это из-за того, что я издеваюсь над методом создания исключения внутри транзакции?Может быть, это не закрывается в этом случае?

Ответы [ 2 ]

0 голосов
/ 31 декабря 2018

Сеансы / транзакции клиента закрыты должным образом, однако создается впечатление, что создание индексов в тестах приводит к глобальной блокировке, что приводит к отставанию следующей блокировки транзакции и ее ожиданию до истечения времени ожидания запроса на блокировку.

По сути, вы должны управлять созданием индекса, чтобы он не мешал транзакциям от клиента.

Одним из быстрых решений было бы увеличить время ожидания блокировки, выполнив в командной строке команду ниже.

db.adminCommand ({setParameter: 1, maxTransactionLockRequestTimeoutMillis: 50})

В процессе работы вы можете посмотреть метку ошибки транзакции и повторить операцию.

Подробнее здесь https://docs.mongodb.com/manual/core/transactions-production-consideration/#pending-ddl-operations-and-transactions

0 голосов
/ 29 декабря 2018

Вы можете проверить соединение опции и предоставить вам драйвер

val connection = MongoConnection(List("localhost"))
val db = connection.database("plugin")
...
connection.askClose()

Вы можете найти метод askClose (), надеюсь, вы можете помочь

...