проблемы с запуском скрипта для чтения из mysql и вставки в mongo (проблемы многопоточности) с использованием play framework - PullRequest
0 голосов
/ 24 декабря 2018

У меня есть скрипт, который я хочу запустить, чтобы просмотреть таблицу mysql, содержащую около 4M записей, и для каждой из них я выполняю другой вызов другой таблицы mysql, чтобы получить больше данных, из этих данных я создаю новый объект и вставляю егона mongodb.

, поэтому в основном я хочу выполнить миграцию с mysql на mongodb

im, используя quill, которая является библиотекой для работы с sql в вашем проекте scala (QDSL).

my script class is very short and looks like:

class MigrateScript @Inject()(dao: PeopleDao) {

  lazy val ctx = new MysqlAsyncContext(SnakeCase, "mysql")

  import ctx._

      def getNextPerson(idx: Int): Future[Person] = {
        val q = quote {
          query[Person].drop(lift(idx)).take(1)
        }
        ctx.run(q.take(1)).map(_.head) recover {
          case t: NoSuchElementException =>
            println(s"problem with index: $idx")
            throw new RuntimeException
        }
      }

      def getPersonKids(personId: Int): Future[List[Kid]] = {
        val q = quote {
          query[Kid].filter(kid => kid.parent_id == lift(personId))
        }
        ctx.run(q)
      }


      def runScript(numberOfRecords: Int): Unit = {
        for (a <- 0 to numberOfRecords) {
          getNextPerson(a).map(person => {
            getPersonKids(person.id).map(kids => {
              // create obj and insert to mongo
              val parentWithKidsObjectToInsert = // creating new object using person & kids

              dao.insert(parentWithKidsObjectToInsert) // this returns Future[String]
            })
          })
        }
      }

}

чтобы запустить его, я делаю это из моего контроллера следующим образом:

  def insertMySqlRecordsToMongo(numberOfRecords: Int) = Action { request =>
    mScript.runScript(numberOfRecords)
    Ok
  }

выдает:

  1. когда я запускаю его таким образом, скрипт застреваетпосле 100 + - записей и я получаю ошибки в моем журнале:

    java.util.concurrent.TimeoutException: фьючерсы истекли через [5 секунд]

и

WARN   [ousekeeper] - c.z.h.p.HikariPool(755)        - HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m51s17ms).

чувствуется, что приложение работает быстрее, чем может обработать пул соединений mysql ...

поэтому я попытался добавить Await.result выше getNextPerson, и он работает хорошо, но очень медленно.Это только вставка 300 записей в минуту, что может занять несколько дней, чтобы просмотреть записи 4M ...

Какие-либо решения, пожалуйста?спасибо, кто нашел время, чтобы понять это:

1 Ответ

0 голосов
/ 24 декабря 2018

Я действительно, очень советую вам заглянуть в Spark, я звучу как типичный сценарий использования ETL.Проблема в том, что вы материализуете тысячи записей в память, которая убивает ваш GC и останавливает фьючерсы.Кроме того, тот факт, что вы должны делать это запись за записью, делает это очень, очень медленно.Если вместо этого вы загрузите это в кадры данных Spark, я возьму меньше места, потому что Spark фактически не материализует записи в память (они используют чрезвычайно компактную двоичную сериализацию в памяти, которая «разливается» на диск при необходимости), что экономит вашикуча от GC-уничтожения.Кроме того, он выполняет загрузку и преобразования для многих, многих записей параллельно.Это даст вам характеристики предформативности, чтобы ваша проблема была решаемой.

Вот примерно то, что я бы сделал:

  1. Загрузите записи в набор данных Spark, используя spark.read.jdbc
  2. Присоединение к наборам данных и группе с помощью родительской записи
  3. Запись записей в MongoDB с помощью Mongo Spark Collector

Сам код долженВыглядит примерно так:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import com.mongodb.spark._
// probably import some other stuff

SparkSession.builder()
      .master("local")
      .appName("Load records to mongo")
       // Configure the spark->mongo connector
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
      .enableHiveSupport()
      .getOrCreate()

case class PersonWithKids(person:Person, kids:List[Kid])

// make sure the MySQL jdbc driver is not the classpath because spark is about to use it
val people = spark.jdbc.read("(select * from people) as data", "jdbc://...").as[Person]
val kids = spark.jdbc.read("(select * from kids) as data", "jdbc://...").as[Kid]
val joined = 
  people
    .joinWith(kids, people("id") === kids("parent_id"))
    .map({case (person, kid) => PersonWithKids(person, List(kid))})
    .groupByKey(_.person)
    .flatMapGroups({case (person, personWithKidIter) => 
        PersonWithKids(person, personWithKidIter.toList.flatMap(_.kids))
    })

// make sure you did stuff correctly
// joined.show()
// joined.take(100).write.json("someFileWhereYouCanDoubleCheck.json")

MongoSpark.save(joined)

Возможно, вам понадобятся следующие зависимости SBT:

"org.apache.spark" %% "spark-core" % "2.3.1" // or 2.4.0 might be latest now
"org.apache.spark" %% "spark-hive" % "2.3.1" // or 2.4.0 might be latest now
"org.apache.spark" %% "spark-sql" % "2.3.1" // or 2.4.0 might be latest now
"org.mongodb.spark" %% "mongo-spark-connector" % "2.3.1" // or 2.4.0 might be latest now

Удачи!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...