Как отловить java.net.ConnectException: соединение отказано на акке стерам? - PullRequest
1 голос
/ 21 апреля 2019

У меня есть потребитель kafka, который выглядит следующим образом:

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import scala.util.{Failure, Success}

object App {
  def main(args: Array[String]): Unit = {


    implicit val system = ActorSystem("SAP-SENDER")
    implicit val executor = system.dispatcher
    implicit val materilizer = ActorMaterializer()

    val config = system.settings.config.getConfig("akka.kafka.consumer")

    val consumerSettings: ConsumerSettings[String, String] =
      ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers("localhost:9003")
        .withGroupId("SAPSENDER")
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

    Consumer
      .plainSource(
        consumerSettings,
        Subscriptions.topics("TEST-TOPIC")
      )
      .runWith(Sink.foreach(println))
      .onComplete{
        case Success(_) => println("Goood")
        case Failure(ex) =>
          println(s"I am failed ==============> ${ex.getMessage}")
          system.terminate()
      }

  }
} 

Сервер kafka не активен, и я хотел бы просто прекратить потребителя. Он всегда пытается подключиться и показывает следующие сообщения:

19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAPSENDER] Pausing partitions []
19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] No broker available to send FindCoordinator request
19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] Coordinator discovery failed, refreshing metadata
19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAPSENDER] Pausing partitions []
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] No broker available to send FindCoordinator request
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] Coordinator discovery failed, refreshing metadata
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
19:03:47.478 [SAP-SENDER-akka.kafka.default-dispatcher-20] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAPSENDER] Pausing partitions []   

также сказано:

java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:173)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:515)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
    at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:380)
    at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:360)
    at akka.kafka.internal.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:221)
    at akka.actor.Actor.aroundReceive(Actor.scala:539)
    at akka.actor.Actor.aroundReceive$(Actor.scala:537)
    at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:142)
    at akka.actor.Timers.aroundReceive(Timers.scala:51)
    at akka.actor.Timers.aroundReceive$(Timers.scala:40)
    at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:142)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:610)
    at akka.actor.ActorCell.invoke(ActorCell.scala:579)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
    at akka.dispatch.Mailbox.run(Mailbox.scala:229)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)  

Как перехватить ConnectException в потоке и не дать потребителю попытаться подключить kafka.

Код находится здесь https://gitlab.com/akka-samples/kafkaconsumer.

Ответы [ 3 ]

2 голосов
/ 26 апреля 2019

С Kafka Client 2.0+ Alpakka Kafka не может заметить, что по указанным адресам нет брокера Kafka.

См. https://github.com/akka/alpakka-kafka/issues/674

1 голос
/ 22 апреля 2019

Глядя на это PR и работу по обновлению до клиента kafka 2.0, я бы предположил, что ответственность за повторные попытки была передана клиенту kafka.Например, я попытался передать эти свойства

val consumerSettings: ConsumerSettings[String, String] =
  ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
    .withProperties(
      "reconnect.backoff.ms" -> "10000",
      "reconnect.backoff.max.ms" -> "20000"
    )
    .withBootstrapServers("localhost:9099")
    .withGroupId("SAPSENDER")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

И исключение появляется во второй раз после 10 секунд.Я нашел эти свойства здесь

Учитывая это, я думаю, что может быть отсутствующая функция с новой адаптацией клиента kafka, так как KafkaConsumerActor не предоставляет исключение для потока, я пыталсяразличные комбинации с использованием вашего репо, но я все еще получаю непрерывный поток отладочных сообщений.

Я надеюсь, что это дает некоторый намек в правильном направлении, пожалуйста, дайте нам знать, если вы решите это.

0 голосов
/ 26 апреля 2019

Вы должны контролировать свой поток и перезапустить его, если есть ошибки.Например, вы можете запустить свой поток внутри актера и обрабатывать соединения с ошибками под наблюдением актера.

Ошибки соединений, вероятно, будут длиться в течение пары секунд (возможно, сеть перегружена), поэтому вам следует использовать обратнуюОтключить стратегию, чтобы избежать повторных штормов.

Поток Akka уже дает вам простой способ сделать это для потоков, использующих RestartSource.См. Обработка ошибок

val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)

val result = RestartSource
  .onFailuresWithBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ) { () =>
    Consumer
      .plainSource(consumerSettings, Subscriptions.topics(topic))
      // this is a hack to get access to the Consumer.Control
      // instances of the latest Kafka Consumer source
      .mapMaterializedValue(c => control.set(c))
      .via(businessFlow)
  }
  .runWith(Sink.seq)

control.get().shutdown()

Это решение будет работать только тогда, когда вы запускаете поток, а посредник не работает, потому что тогда потребитель выдает исключение при попытке его создать.Однако, если вам удастся создать своего потребителя, и после этого весь кластер kafka выйдет из строя, внутренняя KafkaConsumer будет использовать упомянутую конфигурацию reconnect.backoff.ms и reconnect.backoff.max.ms для повторного подключения, и ваш поток не потерпит неудачу.

Если вы хотитеограничить количество пенсионеров, вы должны сделать следующее

val result: Future[Done] = RestartSource
  .onFailuresWithBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ) { () => // your consumer  
  }.
  .take(3) // retries limit
  .runWith(Sink.ignore)

result.onComplete {
  case _ => println("Max retries reached")
}
...