Слик 3 одновременных потока и вставка в одну таблицу вставки блоков - PullRequest
0 голосов
/ 11 декабря 2018

Я пытаюсь дублировать определенные записи с некоторыми изменениями в них (по крайней мере, PK).Учитывая, что количество записей может варьироваться, я выбрал потоковый подход для извлечения данных и стандартный способ Slick для их сохранения.

Вот дайджест:

val query = for {...} yield{...}
val stream = db.stream(query.result.withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = 20).transactionally)

Вот определение потока:

val yourFlow = Flow[BigItem].mapAsync[String](1)(mapper)

  var count = 0
  val sink = Sink.foreach[String] { message =>
    count = count + 1
    Logger.info(message + " " + count)
  }

  val src = Source.fromPublisher(a)
  val flow = src.via(yourFlow).to(sink)
  flow.run()

Здесь определяется Sink, просто проверяя, работает ли поток и проходит ли ожидаемое количество элементов.

И есть преобразователь:

val mapper: BigItem => Future[String] = (i: BigItem) => {...}

Опять же, ничего впечатляющего здесь, в картографе, я изменяю объект, создаю DBIO, сохраняю его и все.

db.run(itemRepository.makePersistentDBIO(bigItem)).map(_ => "OK")

На этом этапе код просто зависает.Если я использую простые DBIO, которые ничего не делают с БД, или пытаюсь получить доступ к другим таблицам базы данных, все идет хорошо, и весь поток загружается, но как только я пытаюсь вставить записи в таблицу, потоковая передача которых приводит к получению данных.от, ни одна запись не вставляется.Я пытался играть с изоляцией транзакций, настройкой курсора, гладкой конфигурацией, но безрезультатно.Все соединения с базой данных простаивают, и все просто останавливается.

Интересно, что ручная вставка данных через psql-клиент, как только эта штука зависает, работает, как и ожидалось, что наводит меня на мысль, что речь идет не о взаимоблокировке на уровне Postgres, и этов любом случае это не имеет особого смысла, поскольку select не должен блокировать запись других транзакций.

...