У меня есть следующий поток, который никогда не достигнет map
после flatMapConcat
.
private def stream[A](ref: ActorRef[ServerHealthStreamer])(implicit system: ActorSystem[A])
: KillSwitch = {
implicit val materializer = ActorMaterializer()
implicit val dispatcher = materializer.executionContext
system.log.info("=============> Start KafkaDetectorStream <=============")
val addr = system
.settings
.config
.getConfig("kafka")
.getString("servers")
val sink: Sink[ServerHealthEvent, NotUsed] =
ActorSink.actorRefWithAck[ServerHealthEvent, ServerHealthStreamer, Ack](
ref = ref,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
Source.tick(1.seconds, 5.seconds, NotUsed)
.flatMapConcat(_ => Source.fromFuture(health(addr)))
.map {
case true =>
KafkaActiveConfirmed
case false =>
KafkaInactiveConfirmed
}
.viaMat(KillSwitches.single)(Keep.right)
.to(sink)
.run()
}
private def health(server: String)(implicit executor: ExecutionContext): Future[Boolean] = {
val props = new Properties
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server)
props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "10000")
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000")
Future {
AdminClient
.create(props)
.listTopics()
.names()
.get()
}
.map(_ => true)
.recover {
case _: Throwable => false
}
}
Я имею в виду, что эта часть:
.map {
case true =>
KafkaActiveConfirmed
case false =>
KafkaInactiveConfirmed
}
никогда не выполняется, и я не знаю причину.Метод health
выполняется как ожидалось.