Почему поток никогда не запускается? - PullRequest
0 голосов
/ 02 июля 2019

У меня есть следующий поток, который никогда не достигнет map после flatMapConcat.

  private def stream[A](ref: ActorRef[ServerHealthStreamer])(implicit system: ActorSystem[A])
  : KillSwitch = {

    implicit val materializer = ActorMaterializer()
    implicit val dispatcher = materializer.executionContext

    system.log.info("=============> Start KafkaDetectorStream <=============")

    val addr = system
      .settings
      .config
      .getConfig("kafka")
      .getString("servers")

    val sink: Sink[ServerHealthEvent, NotUsed] =
      ActorSink.actorRefWithAck[ServerHealthEvent, ServerHealthStreamer, Ack](
        ref = ref,
        onCompleteMessage = Complete,
        onFailureMessage = Fail.apply,
        messageAdapter = Message.apply,
        onInitMessage = Init.apply,
        ackMessage = Ack)

    Source.tick(1.seconds, 5.seconds, NotUsed)
      .flatMapConcat(_ => Source.fromFuture(health(addr)))
      .map {
        case true =>
          KafkaActiveConfirmed
        case false =>
          KafkaInactiveConfirmed
      }
      .viaMat(KillSwitches.single)(Keep.right)
      .to(sink)
      .run()
  }

  private def health(server: String)(implicit executor: ExecutionContext): Future[Boolean] = {
    val props = new Properties
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server)
    props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "10000")
    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000")

    Future {
      AdminClient
        .create(props)
        .listTopics()
        .names()
        .get()
    }
      .map(_ => true)
      .recover {
        case _: Throwable => false
      }
  }

Я имею в виду, что эта часть:

.map {
  case true =>
    KafkaActiveConfirmed
  case false =>
    KafkaInactiveConfirmed
} 

никогда не выполняется, и я не знаю причину.Метод health выполняется как ожидалось.

1 Ответ

2 голосов

Попробуйте добавить .log между flatMapConcat и map, чтобы увидеть испускаемый элемент. log может еще регистрировать ошибки и отмену потока. https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/log.html

Примечание, .log с использованием неявного регистратора

И ваши .flatMapConcat(_ => Source.fromFuture(health(addr))) швы трики, попробуйте .mapAsyncUnordered(1)(_ => health(addr))

...