Как выполнить регулирование на основе заданного пользователем аргумента? - PullRequest
0 голосов
/ 11 декабря 2018

Я пишу в распределенную базу данных в памяти, размер пакета которой определяется пользователем в многопоточной среде.Но я хочу ограничить количество строк, записанных в ex.1000 строк / секПричина этого требования заключается в том, что мой производитель пишет слишком быстро, а потребитель сталкивается с ошибкой конечной памяти.Существует ли какая-либо стандартная практика выполнения регулирования при пакетной обработке записей.

dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
      val dbRecords = recordSet.map(m => (m, Events.transform(m)))
      dbRecords.map { record =>
        try {
          Events.setValues(eventInsert, record._2)
          eventInsert.addBatch
        } catch {
          case e: Exception =>
            logger.error(s"error adding batch: ${e.getMessage}")
            val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
            logger.error(s"event: $error_event")
        }
      }

      // Bulk Commit Records
      try {
        eventInsert.executeBatch
      } catch {
        case e: java.sql.BatchUpdateException =>
          val updates = e.getUpdateCounts
          logger.error(s"failed commit: ${updates.toString}")
          updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
            val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
            logger.error(s"insert error: $error")
            logger.error(e.getMessage)
          }
      }
      finally {
        connection.commit
        eventInsert.clearBatch
        logger.debug(s"committed: ${dbRecords.length.toString}")
      }
    }

Я надеялся, что смогу передать заданные пользователем аргументы как throttleMax, и если общее количество записей, записанных каждым потоком, достигнет throttleMax, thread.sleep () будет вызываться в течение 1 секунды.Но это сделает весь процесс очень медленным.Может ли быть какой-либо другой эффективный метод, который можно использовать для ускорения загрузки данных до 1000 строк / сек?

1 Ответ

0 голосов
/ 11 декабря 2018

Как и другие (см. Комментарии к вопросу), у вас есть лучшие варианты, чем регулирование здесь.Однако вы можете ограничить операцию в Java с помощью некоторого простого кода, подобного следующему:

/**
 * Given an Iterator `inner`, returns a new Iterator which will emit items upon
 * request, but throttled to at most one item every `minDelayMs` milliseconds.
 */
public static <T> Iterator<T> throttledIterator(Iterator<T> inner, int minDelayMs) {
    return new Iterator<T>() {
        private long lastEmittedMillis = System.currentTimeMillis() - minDelayMs;

        @Override
        public boolean hasNext() {
            return inner.hasNext();
        }

        @Override
        public T next() {
            long now = System.currentTimeMillis();
            long requiredDelayMs = now - lastEmittedMillis;
            if (requiredDelayMs > 0) {
                try {
                    Thread.sleep(requiredDelayMs);
                } catch (InterruptedException e) {
                    // resume
                }
            }
            lastEmittedMillis = now;

            return inner.next();
        }
    };
}

Приведенный выше код использует Thread.sleep, поэтому он не подходит для использования в Реактивной системе.В этом случае вы захотите использовать реализацию Throttle, предоставленную в этой системе, например, throttle в Akka

...