Сеанс datastax зависает при больших объемах параллельных запросов - PullRequest
0 голосов
/ 16 ноября 2018

Big Picture.

Сеанс Datastax зависает при параллельной обработке 2000 запросов.

Параллельные запросы

Я использую Alpakka, который упаковывает драйвер Datastax Cassandra.Я использую фреймворк Scala Play.

Чтобы подсчитать строки для больших данных, это нужно сделать по разделам.Я использую следующий код для подсчета строк для каждого раздела:

val futureList: ListBuffer[Future[Any]] = new ListBuffer[Future[Any]]
  val acc: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
  targets.isDefined match {
    case true =>
      targets.get.foreach {
        e =>
          val cq: CassandraQueries = new CassandraQueries()
          Logger.info("targets collected so far: "+acc.size)
          Logger.info("Calling count for "+e._1)


          futureList += cq.futureQuery("SELECT count(*) FROM " + keyspaceName + ".\"sparseData\" where label = " + e._2 + ";", sparseRowCountResult(acc, e._1), 120000)
      }

      val results = Future.sequence(futureList.toList)

В одном из моих пространств ключей у меня 2000 разделов, следовательно, 2000 запросов параллельно.

Результаты запроса

Запрос обрабатывается Alpakka / Datastax и возвращает Future[Seq[Row]].

Logger.info("furtureQuery: session closed -> "+ session.isClosed)
    val stmt = new SimpleStatement(query).setFetchSize(200).setReadTimeoutMillis(readTimeoutMillis)
    val sb: StringBuilder = new StringBuilder()
    val source = CassandraSource(stmt)
    source.runWith(Sink.seq).onComplete {
      case Success(f) => out(Some(f), None)

      case Failure(e) =>
        Logger.error("simpleQuery failed with " + e.getMessage)
        out(None, Some(e.getMessage))

    }

Исключение и зависание После примерно 1000 запросов я получаю следующееошибка.После этого с сессии ничего не возвращается.Ни Success, ни Failure не происходит.

akka.ConfigurationException: Логгер, указанный в config, не может быть загружен [akka.event.Logging $ DefaultLogger] из-за [akka.event.Logging $LoggerInitializationException: Logger log1-Logging $ DefaultLogger не ответил с LoggerInitialized, отправлено вместо [TIMEOUT]]

Вопрос

Я уверен, что могу продлить время ожиданиядля ведения журнала.Но это симптом, а не реальная проблема.

Как мне:

  • настроить подключения сеанса для разрешения 2000параллельные запросы?

или

  • ограничивают Future.sequence известным числом возможных запросов?

Также

  • Как программно восстановиться после такого зависания Sessiion?

Ответы [ 2 ]

0 голосов
/ 18 ноября 2018

Скорее всего 2000 запросов запускают запрос диапазона.Использовать метаданные объектов кластера, получить диапазон токенов и вычислить токен ключей.Затем пакетные запросы, которые попадают под тот же диапазон в одном запросе диапазона.

0 голосов
/ 17 ноября 2018

Вы можете увеличить количество запросов в полете на соединение, указав параметры объединения при создании экземпляра кластера, например:

PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, 10240);

Cluster cluster = Cluster.builder()
    .withContactPoints("127.0.0.1")
    .withPoolingOptions(poolingOptions)
    .build();

Но вам все равно нужно обработать BusyPoolException в своем коде,потому что при использовании асинхронных запросов все равно легко перегружать одно конкретное соединение.

Больше информации в документации драйвера .

...