До JPA 2.2, если я хочу передать ScrollableResults
на Flow
Котлина, я должен сделать так:
override fun findSomeUsers(batch: Int): Flow<User> {
return flow {
(em.delegate as Session).sessionFactory.openSession().use { session ->
val query = session.createQuery("select u from User u where ...")
query.fetchSize = batch
query.isReadOnly = true
query.scroll(ScrollMode.FORWARD_ONLY).use { results ->
while (results.next()) {
val u = results.get(0) as User
emit(u)
}
}
}
}
}
Я должен понизить EntityManager
до Session
в Hibernate.
Но поскольку JPA 2.2 Query
поддерживает getResultStream
, должен быть более понятный способ достижения этого:
@ExperimentalCoroutinesApi
override fun findSomeUsers(batchSize: Int): Flow<User> {
return channelFlow {
em.createQuery("select u from User u where ...")
.setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
.unwrap(javax.persistence.Query::class.java)
.resultStream
.asSequence()
.map { it as User }
.forEach { u ->
runBlocking {
send(u)
}
}
}
}
Ну, это работает хорошо, но что-то подозрительно.
Во-первых, почему я не могу просто набрать resultStream.asSequence.map {it as User}.asFlow()
? (На стороне клиента ничего не происходит)
Во-вторых, блок runBlocking
ужасен. runBlocking
следует использовать только в тесте. Но я не нашел способа обойти это в коде.
Есть ли способ решить это?
В-третьих, это не связано с вопросом. Кажется, Spring-Data-JPA до сих пор не поддерживает такой метод запроса:
@Query("select u from User u where ...")
@MaybeSomeQueryHint(batchSize=:batchSize)
fun findSomeUsers(@Param("name="batchSize") batchSize: Int): Flow<User>
Он загружает всех пользователей, затем жалуется на дублированные строки ...
Клиентский (тестовый) код стороныпросто:
@ExperimentalCoroutinesApi
@Test
@Transactional
open fun testUsers() {
runBlocking {
userDao.findSomeUsers(100).collectIndexed { index, u: User ->
logger.info("[{}] {}", index , u)
}
}
}
Для @Marko версия Stream
работает хорошо:
override fun findSomeUserStream(batchSize: Int): Stream<User> {
return em.createQuery("select u from User u where ...")
.setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
.unwrap(javax.persistence.Query::class.java)
.resultStream
.map { it as User }
}
@Transactional // without this annotation , "Operation not allowed after ResultSet closed" will be thrown
@Test
open fun testUserStream() {
runBlocking {
userDao.findSomeUserStream(100).forEach { u ->
logger.info("{}" , u)
}
}
}
// it works !!
@Transactional
@Test
open fun testUserStream2() {
runBlocking {
userDao.findSomeUserStream(100).asSequence().asFlow().collect { u ->
logger.info("{}" , u)
}
}
}