Я пытаюсь дублировать определенные записи с некоторыми изменениями в них (по крайней мере, PK).Учитывая, что количество записей может варьироваться, я выбрал потоковый подход для извлечения данных и стандартный способ Slick для их сохранения.
Вот дайджест:
val query = for {...} yield{...}
val stream = db.stream(query.result.withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = 20).transactionally)
Вот определение потока:
val yourFlow = Flow[BigItem].mapAsync[String](1)(mapper)
var count = 0
val sink = Sink.foreach[String] { message =>
count = count + 1
Logger.info(message + " " + count)
}
val src = Source.fromPublisher(a)
val flow = src.via(yourFlow).to(sink)
flow.run()
Здесь определяется Sink, просто проверяя, работает ли поток и проходит ли ожидаемое количество элементов.
И есть преобразователь:
val mapper: BigItem => Future[String] = (i: BigItem) => {...}
Опять же, ничего впечатляющего здесь, в картографе, я изменяю объект, создаю DBIO, сохраняю его и все.
db.run(itemRepository.makePersistentDBIO(bigItem)).map(_ => "OK")
На этом этапе код просто зависает.Если я использую простые DBIO, которые ничего не делают с БД, или пытаюсь получить доступ к другим таблицам базы данных, все идет хорошо, и весь поток загружается, но как только я пытаюсь вставить записи в таблицу, потоковая передача которых приводит к получению данных.от, ни одна запись не вставляется.Я пытался играть с изоляцией транзакций, настройкой курсора, гладкой конфигурацией, но безрезультатно.Все соединения с базой данных простаивают, и все просто останавливается.
Интересно, что ручная вставка данных через psql-клиент, как только эта штука зависает, работает, как и ожидалось, что наводит меня на мысль, что речь идет не о взаимоблокировке на уровне Postgres, и этов любом случае это не имеет особого смысла, поскольку select не должен блокировать запись других транзакций.