Почему регистрация не работает для Akka Stream - PullRequest
0 голосов
/ 02 мая 2019

Я использую Alpakka и у меня есть пример игрушки ниже:

val system = ActorSystem("system")
implicit val materializer: ActorMaterializer = ActorMaterializer.create(system)

implicit val adapter: LoggingAdapter = Logging(system, "customLogger")
implicit val ec: ExecutionContextExecutor = system.dispatcher

val log = Logger(this.getClass, "Foo")

val consumerConfig = system.settings.config.getConfig("akka.kafka.consumer")
val consumerSettings: ConsumerSettings[String, String] =
  ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("my-group")

def start() = {
  Consumer.plainSource(consumerSettings, Subscriptions.topics("test"))
    .log("My Consumer: ")
    .withAttributes(
      Attributes.logLevels(
        onElement = Logging.InfoLevel,
        onFinish = Logging.InfoLevel,
        onFailure = Logging.DebugLevel
      )
    )
    .filter(//some predicate)
    .map(// some process)
    .map(out => ByteString(out))
    .runWith(LogRotatorSink(timeFunc))
    .onComplete {
      case Success(_) => log.info("DONE")
      case Failure(e) => log.error("ERROR")
    }
}

Этот код работает.Но у меня возникла проблема с регистрацией.Первая часть с атрибутами - логирование.Когда элемент входит, он делает запись в стандартный вывод.Но когда LogRotatorSink заканчивается и будущее завершается, я хочу вывести DONE на стандартный вывод.Это не работает.Файл создается, поэтому процесс работает, но нет сообщения «ВЫПОЛНЕНО» на стандартный вывод.

Как мне получить "ВЫПОЛНЕНО" на стандартный вывод, пожалуйста?

akka {

  # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
  # to STDOUT)
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # Log level used by the configured loggers (see "loggers") as soon
  # as they have been started; before that, see "stdout-loglevel"
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  loglevel = "INFO"

  # Log level for the very basic logger activated during ActorSystem startup.
  # This logger prints the log messages to stdout (System.out).
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  stdout-loglevel = "INFO"

  # Filter of log events that is used by the LoggingAdapter before
  # publishing log events to the eventStream.
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

}


<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%highlight(%date{HH:mm:ss.SSS} %-5level %-50.50([%logger{50}])) - %msg%n</pattern>
        </encoder>
    </appender>

    <logger name="org.apache.kafka" level="INFO"/>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>

</configuration>

1 Ответ

2 голосов
/ 03 мая 2019

Журнал работает - это ваш Future, который не заканчивается, потому что Kafka Consumer - это бесконечный поток - когда он прочитает все и достигнет самых новых сообщений в теме ... он будет ждать появления новых сообщений- во многих случаях, например, даже закрытие источника такого потока на ровном месте было бы катастрофой, поэтому бесконечно работающий поток по умолчанию является разумным выбором.

Когда этот поток должен фактически закончиться?Четко определите это условие, и вы сможете использовать что-то вроде .take(n), .takeUntil(cond), .takeWithin(time), чтобы закрыть его в явно определенных условиях.Затем поток закроется, Future завершится, и ваш DONE будет напечатан.

...