Проблема: initialize(...)
метод вызывается несколько раз на recordProcessor, когда этот recordProcessor имеет большое количество дочерних потоков.
Окружающая среда:
AWS SDK: 1.11.498
KCL: 1,9,3
Скала: 2.12.8
JDK: OpenJDK 1.8
Хост: Amazon Linux 2 в ECS [ami-007571470797b8ffa]
Количество осколков: 8
Реализация:
Я реализовал RecordProcessor (расширяющий IRecordProcessor) в Scala.
- Этот обработчик записей запускает поток отчетов статистики initialize(...)
, чтобы сообщать статистику нашим сборщикам
- Кроме того, этот обработчик записей внутренне распределяет записи, которые должны обрабатываться, среди нескольких рабочих потоков через рабочую очередь, когда вызывается processRecords(...)
. Эти рабочие потоки также запускаются на initialize(...)
.
Проблема:
Когда количество осколков равно 8, а число рабочих потоков равно 16, KCL несколько раз вызывает метод initialize(...)
на одном и том же recordProcessor. Выдается IllegalThreadStateException
, поскольку statsReporterThread
пытаются запустить, когда он уже запущен предыдущим initialize(...)
вызовом.
Поймайте:
Когда количество осколков равно 8, но число рабочих потоков равно 1, множественные вызовы initialize(...)
НЕ выполняются, и все работает идеально.
Это озадачивает, поскольку рабочие потоки не подвергаются KCL, они являются внутренней реализацией процессора записей.
Я подозревал нижние пределы, поэтому увеличил их, это не помогло.
Кроме того, когда это же приложение запускается на моем ноутбуке, оно работает! Но не работает на AWS ECS.
Код:
class RecordProcessor() extends IRecordProcessor {
val statsReporter = new StatsReporter()
val statsReporterThread = new Thread(statsReporter)
val workQueue: LinkedBlockingDeque[Record] = ...
val workerThreads: ListBuffer[Thread] = ...
def initialize(shardId) = {
statsReporterThread.start()
(0 until 16).foreach(_ => {
val wThread = new Thread(new Worker(workQueue))
workerThreads += wThread
wThread.start()
})
}
def processRecords(records, checkpointer) = {
records.foreach(record => {
wq.put(record)
})
if (currentTimeMs > nextTimeInMs) {
checkpoint(checkpointer)
nextTimeInMs = currentTimeMs + 15000
}
}
def shutdown(checkpointer, reason) = {
workerThreads.foreach(w => {
w.interrupt()
w.join()
})
statsReporterThread.interrupt()
statsReporterThread.join()
}
}
class Worker(workQueue) extends Runnable {
override def run(): Unit = {
while (!Thread.currentThread().isInterrupted) {
val record = q.take()
process(record)
}
}
}
Любая помощь / указатели будут высоко оценены!
Спасибо!