Akka Streams - Поток Большой Стол - PullRequest
0 голосов
/ 26 февраля 2020

Как я могу передать все строки из большой таблицы, используя Akka Streams?

Мне нужно вывести некоторые данные из реляционной БД. В некоторых случаях я получаю большой список идентификаторов. В других случаях мне нужно вывести всю таблицу до 1e9 строк. У меня есть рабочее решение для случая, когда у меня есть (большой) список идентификаторов PK (см. Ниже). Я разделил список идентификаторов на 1000-элементные партии, выбираю по одной партии за раз и возвращаю данные. Но это не работает в тех случаях, когда мне нужно выполнить потоковую передачу всей таблицы, когда у меня нет списка идентификаторов PK. Я имею в виду

  1. выберите min (pk_id), max ((pk_id) из my_table
  2. , чтобы разделить интервал (min_id, max_id) на N пакетов так, чтобы размер каждой партии Используйте приведенное ниже решение для потоковой передачи по одному такому пакету за раз

Но это кажется сложным. Есть ли лучшие проекты? (Это мой первый проект Akka Streams.)

import java.sql.Connection    
import akka.stream.stage.{AbstractOutHandler, GraphStage, GraphStageLogic}
import akka.stream.{Attributes, Outlet, SourceShape}
import com.typesafe.config.Config    
import scala.collection.mutable
import scala.util.control.NonFatal

class DbSource(tableName: String, abc: String) extends GraphStage[SourceShape[String]] {
  private val baseQry: String = query(tableName, abc)
  private val jdbc: Config = appConfig.config.getConfig("jdbc")
  private implicit val conn: Connection = ...
  private var batchID = 0
  private var numRows = 0
  private val batches: Seq[Seq[Long]] = idBatches(tableName, idName, pkIDs)

  val out: Outlet[String] = Outlet.create(s"$tableName.out")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      setHandler(out, new AbstractOutHandler() {
        override def onPull(): Unit = {
          maybeSomeBatch.map(extractBatch).foreach(msg => push(out, msg))
        }
      })
    }
  }

  private def maybeSomeBatch: Option[Seq[Long]] = {
    if (batchID < batches.size) {
      batchID += 1
      Some(batches(batchID - 1)) // because we have already incremented it
    } else None
  }

  override val shape: SourceShape[String] = SourceShape.of(out)

  private def extractBatch(batch: Seq[Long]): String = {
      // select from table where ID in batch
      // package the data and return as a string
  }
}
...