поток akka не потребляет а просто логи WakeupTimeout - PullRequest
0 голосов
/ 02 мая 2018

У меня есть это приложение, которое мы тестировали уже несколько месяцев. Обычная настройка была либо с использованием встроенной kafka, либо kafka в docker-compose. Я должен признать, только с 1 разделом темы. Как только мы развернули его (dockerized, подключаясь к реальному брокеру, только один), он даже не начинает потреблять, а просто регистрирует следующее:

May  2 10:40:05 ip-10-189-162-144 docker: [WARN] [05/02/2018 10:40:05.533] [KafkaIngester-akka.actor.default-dispatcher-4] [akka://KafkaIngester/system/kafka-consumer-1] KafkaConsumer poll is taking significantly longer (13000ms) to return from poll then the configured poll interval (500ms). Waking up consumer to avoid thread starvation.
May  2 10:40:05 ip-10-189-162-144 docker: [WARN] [05/02/2018 10:40:05.540] [KafkaIngester-akka.actor.default-dispatcher-4] [akka://KafkaIngester/system/kafka-consumer-1] Wake up has been triggered. Dumping stacks: Thread[server-scheduler-1,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.sleep(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:85)
May  2 10:40:05 ip-10-189-162-144 docker: akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:265)
May  2 10:40:05 ip-10-189-162-144 docker: akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.run(Thread.java:748)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[Finalizer,8,system]
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Object.wait(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:212)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[KafkaIngester-akka.actor.default-dispatcher-5,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: sun.misc.Unsafe.park(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
May  2 10:40:05 ip-10-189-162-144 docker: akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
May  2 10:40:05 ip-10-189-162-144 docker: akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[KafkaIngester-akka.kafka.default-dispatcher-111,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: sun.misc.Unsafe.park(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.run(Thread.java:748)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[KafkaIngester-akka.kafka.default-dispatcher-116,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: sun.misc.Unsafe.park(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.run(Thread.java:748)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[persistence.database housekeeper,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: sun.misc.Unsafe.park(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.run(Thread.java:748)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[KafkaIngester-akka.actor.default-dispatcher-4,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.dumpThreads(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.getAllStackTraces(Thread.java:1610)
May  2 10:40:05 ip-10-189-162-144 docker: akka.kafka.KafkaConsumerActor.$anonfun$poll$1(KafkaConsumerActor.scala:283)
May  2 10:40:05 ip-10-189-162-144 docker: akka.kafka.KafkaConsumerActor$$Lambda$675/1405931784.apply$mcV$sp(Unknown Source)
May  2 10:40:05 ip-10-189-162-144 docker: akka.actor.Scheduler$$anon$4.run(Scheduler.scala:140)
May  2 10:40:05 ip-10-189-162-144 docker: akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
May  2 10:40:05 ip-10-189-162-144 docker: akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:43)
May  2 10:40:05 ip-10-189-162-144 docker: akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
May  2 10:40:05 ip-10-189-162-144 docker: akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
May  2 10:40:05 ip-10-189-162-144 docker: akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
May  2 10:40:05 ip-10-189-162-144 docker: akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[KafkaIngester-akka.kafka.default-dispatcher-117,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: sun.misc.Unsafe.park(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.run(Thread.java:748)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[KafkaIngester-akka.kafka.default-dispatcher-113,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: sun.misc.Unsafe.park(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.run(Thread.java:748)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[server-akka.io.pinned-dispatcher-10,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
May  2 10:40:05 ip-10-189-162-144 docker: sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
May  2 10:40:05 ip-10-189-162-144 docker: sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
May  2 10:40:05 ip-10-189-162-144 docker: sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
May  2 10:40:05 ip-10-189-162-144 docker: akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.tryRun(SelectionHandler.scala:128)
May  2 10:40:05 ip-10-189-162-144 docker: akka.io.SelectionHandler$ChannelRegistryImpl$Task.run(SelectionHandler.scala:246)
May  2 10:40:05 ip-10-189-162-144 docker: akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.run(SelectionHandler.scala:161)
May  2 10:40:05 ip-10-189-162-144 docker: akka.util.SerializedSuspendableExecutionContext.run$1(SerializedSuspendableExecutionContext.scala:67)
May  2 10:40:05 ip-10-189-162-144 docker: akka.util.SerializedSuspendableExecutionContext.run(SerializedSuspendableExecutionContext.scala:71)
May  2 10:40:05 ip-10-189-162-144 docker: akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.run(Thread.java:748)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[kafka-producer-network-thread | producer-1,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
May  2 10:40:05 ip-10-189-162-144 docker: sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
May  2 10:40:05 ip-10-189-162-144 docker: sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
May  2 10:40:05 ip-10-189-162-144 docker: sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
May  2 10:40:05 ip-10-189-162-144 docker: org.apache.kafka.common.network.Selector.select(Selector.java:672)
May  2 10:40:05 ip-10-189-162-144 docker: org.apache.kafka.common.network.Selector.poll(Selector.java:396)
May  2 10:40:05 ip-10-189-162-144 docker: org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
May  2 10:40:05 ip-10-189-162-144 docker: org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
May  2 10:40:05 ip-10-189-162-144 docker: org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.run(Thread.java:748)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[DestroyJavaVM,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: Thread[Abandoned connection cleanup thread,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Object.wait(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
May  2 10:40:05 ip-10-189-162-144 docker: com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.run(AbandonedConnectionCleanupThread.java:43)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[KafkaIngester-akka.kafka.default-dispatcher-114,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: sun.misc.Unsafe.park(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.run(Thread.java:748)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[KafkaIngester-akka.kafka.default-dispatcher-118,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: sun.misc.Unsafe.park(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.run(Thread.java:748)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[KafkaIngester-akka.kafka.default-dispatcher-115,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: sun.misc.Unsafe.park(Native Method)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
May  2 10:40:05 ip-10-189-162-144 docker: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
May  2 10:40:05 ip-10-189-162-144 docker: java.lang.Thread.run(Thread.java:748)
May  2 10:40:05 ip-10-189-162-144 docker: Thread[KafkaIngester-akka.kafka.default-dispatcher-112,5,main]
May  2 10:40:05 ip-10-189-162-144 docker: sun.misc.Unsafe.park(Native Method)

.....

Как видите, проблема в WakeupTimeout. Я пытался увеличить его до 13 секунд, но все равно не удается. Я пытался увеличить интервал опроса, но снова не удается. Мы попытались запустить telnet kafka 9092 из док-контейнера на той же машине, и соединение было принято. Но это ничего не потребляет. Мы действительно не знаем почему.

Мой поток

Мой поток:

 val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new ByteArrayDeserializer)
      .withProperties(kafkaProperties)
val source =     Consumer.committableSource(consumerSettings, Subscriptions.topics(topics.toSet)).map { msg =>
      receivedCounter.foreach(_.inc())
      msg
    }

   source
      .mapAsync(maxParallel)(process)
      .mapAsync(maxParallel)(commit).runWith(Sink.ignore)

 def commit(committableMessage: CommittableMessage[String, Array[Byte]]): Future[Done] = {
      errorReporterWaitStrategy.reset()
      processorWaitStrategy.reset()
      committableMessage.committableOffset.commitScaladsl()
    }

def process(msg: MSG): Future[MSG] = {
      receivedCounter.foreach(_.inc())
      innerInvoke(msg)
    }

это фрагменты, скопированные из различных частей моего кода.

...