Kinesis Consumer recordProcessor initialize (...) вызывается несколько раз - PullRequest
0 голосов
/ 27 марта 2019

Проблема: initialize(...) метод вызывается несколько раз на recordProcessor, когда этот recordProcessor имеет большое количество дочерних потоков.

Окружающая среда:

AWS SDK: 1.11.498

KCL: 1,9,3

Скала: 2.12.8

JDK: OpenJDK 1.8

Хост: Amazon Linux 2 в ECS [ami-007571470797b8ffa]

Количество осколков: 8

Реализация:

Я реализовал RecordProcessor (расширяющий IRecordProcessor) в Scala. - Этот обработчик записей запускает поток отчетов статистики initialize(...), чтобы сообщать статистику нашим сборщикам - Кроме того, этот обработчик записей внутренне распределяет записи, которые должны обрабатываться, среди нескольких рабочих потоков через рабочую очередь, когда вызывается processRecords(...). Эти рабочие потоки также запускаются на initialize(...).

Проблема: Когда количество осколков равно 8, а число рабочих потоков равно 16, KCL несколько раз вызывает метод initialize(...) на одном и том же recordProcessor. Выдается IllegalThreadStateException, поскольку statsReporterThread пытаются запустить, когда он уже запущен предыдущим initialize(...) вызовом.

Поймайте: Когда количество осколков равно 8, но число рабочих потоков равно 1, множественные вызовы initialize(...) НЕ выполняются, и все работает идеально. Это озадачивает, поскольку рабочие потоки не подвергаются KCL, они являются внутренней реализацией процессора записей. Я подозревал нижние пределы, поэтому увеличил их, это не помогло.

Кроме того, когда это же приложение запускается на моем ноутбуке, оно работает! Но не работает на AWS ECS.

Код:

class RecordProcessor() extends IRecordProcessor {

  val statsReporter = new StatsReporter()
  val statsReporterThread = new Thread(statsReporter)

  val workQueue: LinkedBlockingDeque[Record] = ...
  val workerThreads: ListBuffer[Thread] = ...

  def initialize(shardId) = {
    statsReporterThread.start()

    (0 until 16).foreach(_ => {
      val wThread = new Thread(new Worker(workQueue))
      workerThreads += wThread
      wThread.start()
    })
  }

  def processRecords(records, checkpointer) = {
    records.foreach(record => {
      wq.put(record)
    })

    if (currentTimeMs > nextTimeInMs) {
      checkpoint(checkpointer)
      nextTimeInMs = currentTimeMs + 15000
    }
  }

  def shutdown(checkpointer, reason) = {
    workerThreads.foreach(w => {
      w.interrupt()
      w.join()
    })

    statsReporterThread.interrupt()
    statsReporterThread.join()
  }
}

class Worker(workQueue) extends Runnable {
  override def run(): Unit = {
    while (!Thread.currentThread().isInterrupted) {
      val record = q.take()
      process(record)
    }
  }
}

Любая помощь / указатели будут высоко оценены!

Спасибо!

...