Big Picture.
Сеанс Datastax зависает при параллельной обработке 2000 запросов.
Параллельные запросы
Я использую Alpakka, который упаковывает драйвер Datastax Cassandra.Я использую фреймворк Scala Play.
Чтобы подсчитать строки для больших данных, это нужно сделать по разделам.Я использую следующий код для подсчета строк для каждого раздела:
val futureList: ListBuffer[Future[Any]] = new ListBuffer[Future[Any]]
val acc: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
targets.isDefined match {
case true =>
targets.get.foreach {
e =>
val cq: CassandraQueries = new CassandraQueries()
Logger.info("targets collected so far: "+acc.size)
Logger.info("Calling count for "+e._1)
futureList += cq.futureQuery("SELECT count(*) FROM " + keyspaceName + ".\"sparseData\" where label = " + e._2 + ";", sparseRowCountResult(acc, e._1), 120000)
}
val results = Future.sequence(futureList.toList)
В одном из моих пространств ключей у меня 2000 разделов, следовательно, 2000 запросов параллельно.
Результаты запроса
Запрос обрабатывается Alpakka / Datastax и возвращает Future[Seq[Row]].
Logger.info("furtureQuery: session closed -> "+ session.isClosed)
val stmt = new SimpleStatement(query).setFetchSize(200).setReadTimeoutMillis(readTimeoutMillis)
val sb: StringBuilder = new StringBuilder()
val source = CassandraSource(stmt)
source.runWith(Sink.seq).onComplete {
case Success(f) => out(Some(f), None)
case Failure(e) =>
Logger.error("simpleQuery failed with " + e.getMessage)
out(None, Some(e.getMessage))
}
Исключение и зависание После примерно 1000 запросов я получаю следующееошибка.После этого с сессии ничего не возвращается.Ни Success
, ни Failure
не происходит.
akka.ConfigurationException: Логгер, указанный в config, не может быть загружен [akka.event.Logging $ DefaultLogger] из-за [akka.event.Logging $LoggerInitializationException: Logger log1-Logging $ DefaultLogger не ответил с LoggerInitialized, отправлено вместо [TIMEOUT]]
Вопрос
Я уверен, что могу продлить время ожиданиядля ведения журнала.Но это симптом, а не реальная проблема.
Как мне:
- настроить подключения сеанса для разрешения 2000параллельные запросы?
или
- ограничивают Future.sequence известным числом возможных запросов?
Также
- Как программно восстановиться после такого зависания Sessiion?