Как спроектировать рабочую вертикалку с бесконечной петлей блокировки? - PullRequest
0 голосов
/ 10 марта 2019

Я пытаюсь составить рабочую статью, которая соединит подписку на тему облака Google PubSub с шиной событий из vert.x, приняв пример kotlin PubSub в сочетании с this ответ относительно работника с бесконечной обработкой цикла блокировки.

Это работает, но Vert.X продолжает ворчать в журнал, что Thread blocked, выбрасывая исключение через некоторое время после получения сообщения от PubSub (пожалуйста, игнорируйте блокировку инициализациина данный момент):

9:15:12 AM: Executing task 'run'...

WARNING: You are a using release candidate 2.0.0-rc5. Behavior of this plugin has changed since 1.3.5. Please see release notes at: https://github.com/GoogleCloudPlatform/app-gradle-plugin.
Missing a feature? Can't get it to work?, please file a bug at: https://github.com/GoogleCloudPlatform/app-gradle-plugin/issues.
:compileKotlin UP-TO-DATE
:compileJava NO-SOURCE
:processResources NO-SOURCE
:classes UP-TO-DATE
Mar 10, 2019 9:15:18 AM io.vertx.core.impl.launcher.commands.Watcher
INFO: Watched paths: [/home/username/IdeaProjects/project_name/./src]
Mar 10, 2019 9:15:18 AM io.vertx.core.impl.launcher.commands.Watcher
INFO: Starting the vert.x application in redeploy mode
:run
Starting vert.x application...
f48ba7fd-a52b-487f-b553-2b74473e58ba-redeploy
Creating topic gcs-project-id:vertx.
Mar 10, 2019 9:15:18 AM com.google.auth.oauth2.DefaultCredentialsProvider warnAboutProblematicCredentials
WARNING: Your application has authenticated using end user credentials from Google Cloud SDK. We recommend that most server applications use service accounts instead. If your application continues to use end user credentials from Cloud SDK, you might receive a "quota exceeded" or "API not enabled" error. For more information about service accounts, see https://cloud.google.com/docs/authentication/.
Mar 10, 2019 9:15:21 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2759 ms, time limit is 2000 ms
Topic gcs-project-id:vertx successfully created.
Creating subscription gcs-project-id:kotlin.
Mar 10, 2019 9:15:22 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 3759 ms, time limit is 2000 ms
Mar 10, 2019 9:15:23 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 4758 ms, time limit is 2000 ms
Mar 10, 2019 9:15:24 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 5759 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
    at sun.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
    at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:142)
    at com.google.common.util.concurrent.Futures.getUnchecked(Futures.java:1309)
    at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:52)
    at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
    at com.google.cloud.pubsub.v1.SubscriptionAdminClient.createSubscription(SubscriptionAdminClient.java:359)
    at com.google.cloud.pubsub.v1.SubscriptionAdminClient.createSubscription(SubscriptionAdminClient.java:260)
    at com.example.project.MainVerticle.subscribeTopic(MainVerticle.kt:76)
    at com.example.project.MainVerticle.init(MainVerticle.kt:46)
    at io.vertx.core.impl.DeploymentManager.lambda$doDeploy$8(DeploymentManager.java:492)
    at io.vertx.core.impl.DeploymentManager$$Lambda$28/1902260856.handle(Unknown Source)
    at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
    at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
    at io.vertx.core.impl.EventLoopContext$$Lambda$29/1640639994.run(Unknown Source)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

Subscription gcs-project-id:kotlin successfully created.
Listening to messages on kotlin:
Mar 10, 2019 9:15:25 AM io.vertx.core.impl.launcher.commands.VertxIsolatedDeployer
INFO: Succeeded in deploying verticle

Message Id: 462746807438186 Data: Bazinga
Message Id: 462746750387788 Data: Another message

Mar 10, 2019 9:16:25 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-worker-thread-0,5,main] has been blocked for 60171 ms, time limit is 60000 ms
io.vertx.core.VertxException: Thread blocked
    at sun.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
    at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
    at com.example.project.MainVerticle$start$1.handle(MainVerticle.kt:32)
    at com.example.project.MainVerticle$start$1.handle(MainVerticle.kt:13)
    at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:272)
    at io.vertx.core.impl.ContextImpl$$Lambda$33/1101004004.run(Unknown Source)
    at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)
    at io.vertx.core.impl.TaskQueue$$Lambda$26/1213216872.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

Mar 10, 2019 9:16:26 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-worker-thread-0,5,main] has been blocked for 61172 ms, time limit is 60000 ms
io.vertx.core.VertxException: Thread blocked
    at sun.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
    at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
    at com.example.project.MainVerticle$start$1.handle(MainVerticle.kt:32)
    at com.example.project.MainVerticle$start$1.handle(MainVerticle.kt:13)
    at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:272)
    at io.vertx.core.impl.ContextImpl$$Lambda$33/1101004004.run(Unknown Source)
    at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)
    at io.vertx.core.impl.TaskQueue$$Lambda$26/1213216872.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

А вот и исходный код:

package com.example.project_name

import com.google.api.gax.rpc.ApiException
import com.google.cloud.pubsub.v1.*
import com.google.pubsub.v1.ProjectSubscriptionName
import com.google.pubsub.v1.ProjectTopicName
import com.google.pubsub.v1.PubsubMessage
import com.google.pubsub.v1.PushConfig
import io.vertx.core.*
import java.util.concurrent.LinkedBlockingDeque


class MainVerticle : MessageReceiver, AbstractVerticle() {
  private val projectId = "gcs-project-id"
  private val topicId = "vertx"
  private val topic: ProjectTopicName = ProjectTopicName.of(projectId, topicId)
  private val subscriptionId = "kotlin"
  private val subscription = ProjectSubscriptionName.of(projectId, subscriptionId)
  private val messages = LinkedBlockingDeque<PubsubMessage>()
  private lateinit var subscriber: Subscriber

  override fun receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer) {
    messages.offer(message)
    consumer.ack()
  }

  override fun start() {
    vertx.executeBlocking<Void>({
      try {
        println("Listening to messages on $subscriptionId:")
        subscriber.awaitRunning()
        while (true) {
          val message = messages.take()
          println("Message Id: ${message.messageId} Data: ${message.data.toStringUtf8()}")
        }
      } finally {
        subscriber.stopAsync()
        it.complete()
      }
    }, { println("done, ${it.cause()}") })
  }

  override fun init(vertx: Vertx?, context: Context?) {
    super.init(vertx, context)
    try {
      createTopic()
      subscribeTopic()
      subscriber = Subscriber.newBuilder(subscription, this).build()
      subscriber.startAsync()
    } catch (e: ApiException) {
      // example : code = ALREADY_EXISTS(409) implies topic already exists
      println("Failed: $e")
    }
  }

  override fun stop(stopFuture: Future<Void>?) {
    super.stop(stopFuture)
    try {
      deleteSub()
      deleteTopic()
    } catch (e: ApiException) {
      println("Failed: $e")
    } finally {
      subscriber.stopAsync()
      stopFuture!!.complete()
    }
  }

  private fun createTopic() { // expects 1 arg: <topic> to create
    println("Creating topic ${topic.project}:${topic.topic}.")
    TopicAdminClient.create().use { topicAdminClient -> topicAdminClient.createTopic(topic) }
    println("Topic ${topic.project}:${topic.topic} successfully created.")
  }

  private fun subscribeTopic() { // expects 2 args: <topic> and <subscription>
    println("Creating subscription ${subscription.project}:${subscription.subscription}.")
    SubscriptionAdminClient.create().use { it.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0) }
    println("Subscription ${subscription.project}:${subscription.subscription} successfully created.")
  }

  private fun deleteTopic() {
    println("Deleting topic ${topic.project}:${topic.topic}.")
    TopicAdminClient.create().use { it.deleteTopic(topic) }
    println("Topic ${topic.project}:${topic.topic} successfully deleted.")
  }

  private fun deleteSub() { // expects 1 arg: <subscription> to delete
    println("Deleting subscription ${subscription.project}:${subscription.subscription}.")
    SubscriptionAdminClient.create().use { it.deleteSubscription(subscription) }
    println("Subscription ${subscription.project}:${subscription.subscription} successfully deleted.")
  }
}

fun main(vararg args: String) {
  Vertx.vertx().deployVerticle(MainVerticle(), DeploymentOptions().apply {
    isWorker = true
  })
}

Я явно что-то упускаю.Также, если у вас есть лучший подход, который может интегрировать / объединить библиотеку Google PubSub (которая имеет собственный асинхронный цикл) с Vert.X, я был бы рад услышать о моем примитивном примере.

1 Ответ

1 голос
/ 11 марта 2019

Проблема в вашем while цикле.

«Блокировка» в этом случае не означает , что означает, что вы можете продолжать работать вечно. Ваш звонок на it.complete() никогда не будет достигнут, и в какой-то момент Vert.x пожалуется на это.

См. Руководство по Запуск кода блокировки , в частности, раздел ПРЕДУПРЕЖДЕНИЕ.

Чтобы решить вашу проблему, вам нужно запланировать ваши звонки на messages.take() так или иначе, например, используя setPeriodic . Внутри обработчика интервала очистите свою очередь с помощью executeBlocking, затем верните управление, вызвав complete(), до или после того, как вы запланировали обработку сообщений, в зависимости от того, заботитесь ли вы о результате.

...