Весенние данные с JPA 2.2 resultStream в поток Котлина - PullRequest
1 голос
/ 04 ноября 2019

До JPA 2.2, если я хочу передать ScrollableResults на Flow Котлина, я должен сделать так:

  override fun findSomeUsers(batch: Int): Flow<User> {
    return flow {
      (em.delegate as Session).sessionFactory.openSession().use { session ->
        val query = session.createQuery("select u from User u where ...")
        query.fetchSize = batch
        query.isReadOnly = true

        query.scroll(ScrollMode.FORWARD_ONLY).use { results ->
          while (results.next()) {
            val u = results.get(0) as User
            emit(u)
          }
        }
      }
    }
  }

Я должен понизить EntityManager до Session в Hibernate.

Но поскольку JPA 2.2 Query поддерживает getResultStream, должен быть более понятный способ достижения этого:

  @ExperimentalCoroutinesApi
  override fun findSomeUsers(batchSize: Int): Flow<User> {
    return channelFlow {
      em.createQuery("select u from User u where ...")
        .setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
        .unwrap(javax.persistence.Query::class.java)
        .resultStream
        .asSequence()
        .map { it as User }
        .forEach { u ->
          runBlocking {
            send(u)
          }
        }
    }
  }

Ну, это работает хорошо, но что-то подозрительно.

Во-первых, почему я не могу просто набрать resultStream.asSequence.map {it as User}.asFlow()? (На стороне клиента ничего не происходит)

Во-вторых, блок runBlocking ужасен. runBlocking следует использовать только в тесте. Но я не нашел способа обойти это в коде.

Есть ли способ решить это?

В-третьих, это не связано с вопросом. Кажется, Spring-Data-JPA до сих пор не поддерживает такой метод запроса:

  @Query("select u from User u where ...") 
  @MaybeSomeQueryHint(batchSize=:batchSize)
  fun findSomeUsers(@Param("name="batchSize") batchSize: Int): Flow<User>

Он загружает всех пользователей, затем жалуется на дублированные строки ...

Клиентский (тестовый) код стороныпросто:

  @ExperimentalCoroutinesApi
  @Test
  @Transactional
  open fun testUsers() {
    runBlocking {
      userDao.findSomeUsers(100).collectIndexed { index, u: User ->
        logger.info("[{}] {}", index , u)
      }
    }
  }

Для @Marko версия Stream работает хорошо:

  override fun findSomeUserStream(batchSize: Int): Stream<User> {
    return em.createQuery("select u from User u where ...")
      .setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
      .unwrap(javax.persistence.Query::class.java)
      .resultStream
      .map { it as User }
  }


  @Transactional // without this annotation , "Operation not allowed after ResultSet closed" will be thrown
  @Test
  open fun testUserStream() {
    runBlocking {
      userDao.findSomeUserStream(100).forEach { u ->
        logger.info("{}" , u)
      }
    }
  }


  // it works !!
  @Transactional
  @Test
  open fun testUserStream2() {
    runBlocking {
      userDao.findSomeUserStream(100).asSequence().asFlow().collect { u ->
        logger.info("{}" , u)
      }
    }
  }

1 Ответ

1 голос
/ 04 ноября 2019

Вместо исправления результата Stream.toSequence() определите это преобразование Stream в Flow:

fun <T> Stream<T>.asFlow() = flow {
    for (t in iterator()) {
        emit(t)
    }
}

Если вы используете его с этим примером кода:

suspend fun main() {
    Stream.of("a", "b")
            .asFlow()
            .collect { println(it) }
}

будет напечатано

a
b

Ваша функция должна выглядеть следующим образом:

override fun findSomeUsers(batchSize: Int): Flow<User> {
    return em.createQuery("select u from User u where ...")
            .setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
            .unwrap(javax.persistence.Query::class.java)
            .resultStream
            .asFlow()
            .map { it as User }
}
...