Как я могу ограничить операции записи до 1k записей / сек? - PullRequest
0 голосов
/ 21 ноября 2018

В настоящее время я могу записать в базу данных размером 500 пакетов. Но из-за ошибки нехватки памяти и задержки синхронизации между дочерним агрегатором и конечным узлом базы данных иногда я сталкиваюсь с ошибкой памяти конечного узла.Единственное решение для этого - если я ограничу свои операции записи до 1 000 записей в секунду, я смогу избавиться от ошибки.

dataStream
  .map(line => readJsonFromString(line))
  .grouped(memsqlBatchSize)
  .foreach { recordSet =>
          val dbRecords = recordSet.map(m => (m, Events.transform(m)))
          dbRecords.map { record =>
            try {
              Events.setValues(eventInsert, record._2)
              eventInsert.addBatch
            } catch {
              case e: Exception =>
                logger.error(s"error adding batch: ${e.getMessage}")
                val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
                logger.error(s"event: $error_event")
            }
          }

          // Bulk Commit Records
          try {
            eventInsert.executeBatch
          } catch {
            case e: java.sql.BatchUpdateException =>
              val updates = e.getUpdateCounts
              logger.error(s"failed commit: ${updates.toString}")
              updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
                val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
                logger.error(s"insert error: $error")
                logger.error(e.getMessage)
              }
          }
          finally {
            connection.commit
            eventInsert.clearBatch
            logger.debug(s"committed: ${dbRecords.length.toString}")
          }
        }

Причиной записи 1k является то, что некоторые данные, которые я пытаюсь записать, могут содержать тонны записей json, а если размер пакета равен 500, это может привести к 30 тысячам записей в секунду.Есть ли способ, чтобы я мог быть уверен, что в базу данных будет записано только 1000 записей независимо от количества записей?

Ответы [ 2 ]

0 голосов
/ 22 ноября 2018

Я не думаю, что Thead.sleep - хорошая идея справиться с этой ситуацией.Как правило, мы не рекомендуем делать это в Scala и не хотим блокировать поток в любом случае.

Одним из предложений будет использование любых потоковых методов, таких как Akka.Stream, Monix.Observable.Есть некоторые плюсы и минусы между этими библиотеками, я не хочу тратить на это слишком много абзацев.Но они поддерживают обратное давление, чтобы контролировать скорость производства, когда потребитель медленнее, чем производитель.Например, в вашем случае ваш потребитель пишет базу данных, а ваш продюсер, возможно, читает некоторые файлы json и выполняет некоторые агрегации.

Следующий код иллюстрирует идею, и вам нужно будет изменить ее по мере необходимости:

val sourceJson = Source(dataStream.map(line => readJsonFromString(line)))
val sinkDB = Sink(Events.jm.writeValueAsString) // you will need to figure out how to generate the Sink
val flowThrottle = Flow[String]
  .throttle(1, 1.second, 1, ThrottleMode.shaping)

val runnable = sourceJson.via[flowThrottle].toMat(sinkDB)(Keep.right)
val result = runnable.run()
0 голосов
/ 22 ноября 2018

Блок кода уже вызывается потоком, и параллельно работают несколько потоков.Либо я могу использовать Thread.sleep(1000) или delay(1.0) в этом скала-коде.Но если я использую delay(), он будет использовать обещание, которое может вызываться вне функции.Похоже, Thread.sleep() - лучший вариант вместе с размером партии 1000.После тестирования я смог без проблем протестировать 120000 записей / поток / сек.

В соответствии с архитектурой memsql все загрузки в memsql сначала поступают в хранилище строк в локальную память, а оттуда memsql сливается с хранилищем столбцов в конце листьев.Это приводило к листовой ошибке каждый раз, когда я помещал больше данных, вызывая узкое место.Уменьшение размера пакета и введение Thread.sleep () помогли мне написать 120000 записей в секунду.Выполнено тестирование с этим тестом.

...