Имя темы в Akka, регистрирующемся на актере относительно PinnedDispatcher - PullRequest
0 голосов
/ 03 марта 2020

Я кодирую приложение с помощью Akka v2.5.23. В приложении задействованы следующие актеры:

  • Класс актера маршрутизатора с именем CalculatorRouter
  • Класс актера маршрутизатора с именем Calculator

Я настроил PinnedDispatcher при создании Вычислите актера и поместите log.info в метод получения класса этого актера. Я ожидал увидеть в файле журнала поле имени потока, которое будет содержать pinned. Однако поле имени потока - default-dispatcher. Я искал в файле журнала и обнаружил, что все имя потока по отношению к этому log.info будет default-dispatcher. Что-то не так с моим кодом?

Фрагмент файла журнала:

09:49:25.116 [server-akka.actor.default-dispatcher-14] INFO  handler.Calculator $anonfun$applyOrElse$3 92 - akka://server/user/device/$a/$a Total calc received

Ниже приведены фрагменты кода:

class CalculatorRouter extends Actor with ActorLogging {
    var router = {
        val routees = Vector.fill(5) {
            val r = context.actorOf(Props[Calculator].withDispatcher("calc.my-pinned-dispatcher"))
            context.watch(r)
            ActorRefRoutee(r)
        }
        Router(SmallestMailboxRoutingLogic(), routees)
    }

    def receive = {
        case w:  Calc => router.route(w, sender)
        case Terminated(a) =>
            router.removeRoutee(a)
            val r = context.actorOf(Props[Calculator].withDispatcher("calc.my-pinned-dispatcher"))
            context.watch(r)
            router = router.addRoutee(r)
    }
}

Cal c .my- закрепленный диспетчер настроен следующим образом:

  calc.my-pinned-dispatcher {
    executor="thread-pool-executor"
    type=PinnedDispatcher
  }

Исходный код калькулятора классов выглядит следующим образом:

class Calculator extends Actor with ActorLogging {
    val w = new UdanRemoteCalculateTotalBalanceTime

    def receive = {
        case TotalCalc(fn, ocvFilepath, ratedCapacity, battCount) ⇒

                log.info(s"${self.path} Total calc received")
                Try{
                    w.CalculateTotalBalanceTime(1, fn, ocvFilepath, ratedCapacity)
                } match {
                    case Success(t) ⇒
                        val v = t.getIntData
                        sender.!(Calculated(v))(context.parent)
                    case Failure(e) ⇒ log.error(e.getMessage)
                }
    }
}

object Calculator {
    sealed trait Calc
    final case class TotalCalc(filename: String, ocvFilepath: String, ratedCapacity: String, batteryCount: Int) extends Calc
}

logback. xml

<configuration debug="true">
   <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
        <!-- reset all previous level configurations of all j.u.l. loggers -->
        <resetJUL>true</resetJUL>
    </contextListener>
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>/var/log/app.log</file>
        <append>true</append>

        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
          <!-- daily rollover -->
              <fileNamePattern>/var/log/app.%d{yyyy-MM-dd}.log</fileNamePattern>

              <!-- keep 30 days' worth of history capped at 3GB total size -->
              <maxHistory>100</maxHistory>
              <totalSizeCap>30000MB</totalSizeCap>
        </rollingPolicy>

        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} %M %L - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="ASYNCFILE" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="FILE" />
        <queueSize>500</queueSize>
        <includeCallerData>true</includeCallerData>
    </appender>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} %M %L - %msg%n</pattern>
        </encoder>
    </appender>


    <logger name="application" level="DEBUG"/>
    <root level="INFo">
        <appender-ref ref="ASYNCFILE"/>
    </root>
</configuration>


'20 4 марта Обновление

Спасибо @ anand-sai. После того, как я поместил akka.loggers-dispatcher = "calc.my-pinned-dispatcher" в файл conf, я получил my-pinned-dispatcher-xx в качестве имени потока в каждой строке файла журнала. Я подумал, что имя потока должно указывать на поток, в котором метод receive субъекта калькулятора выполняет, в данном случае, что-то похожее на 'pinned-dispatcher-xx', поскольку поток был получен закрепленным диспетчером в моей конфигурации. Теперь это доказывает, что он указывает поток, полученный диспетчером логгера. Если это так, как записать имя потока для кода обработчика сообщений актера?

1 Ответ

1 голос
/ 03 марта 2020

Я думаю, что решение состоит в том, чтобы добавить akka.loggers-dispatcher в ваш application.conf

calc.my-pinned-dispatcher {
    executor="thread-pool-executor"
    type=PinnedDispatcher
  }
akka.loggers-dispatcher = "calc.my-pinned-dispatcher"

Если вы ищете logger-dispatcher в конфигурации по умолчанию akka, вы найдете значение должно быть "akka.actor.default-dispatcher`, и нам нужно переопределить этот конфиг, как показано выше.

EDIT

ActorLogging является асинхронным. Когда вы регистрируетесь используя ActorLogging, он отправляет сообщение субъекту ведения журнала, который по умолчанию выполняется диспетчером по умолчанию. Logback регистрирует поток, вызвавший его, который будет потоком участника ActorLogging, а не потоком вашего субъекта. Для достижения этой цели существует является так называемым Mapped Diagnosti c Context (MD C), который захватывает akka source (Путь субъекта, в котором была выполнена регистрация) , source thread ( поток, в котором было выполнено ведение журнала) и многое другое, в котором было выполнено ведение журнала.

Как указано в документации :

С ведение журнала выполняется асинхронно поток, в котором было выполнено ведение журнала, записывается в MD C с именем атрибута sourceThread.

Путь субъекта, в котором было выполнено ведение журнала, доступен в MD C с именем атрибута akkaSource.

Системное имя субъекта, в котором было выполнено ведение журнала, доступно в MD C с именем атрибута sourceActorSystem, но обычно оно также включено в атрибут akkaSource.

Адрес системы субъекта, содержащий хост и порт, если система использует кластер, доступен через akkaAddress.

Для типизированных акторов временная метка события журнала берется, когда был выполнен вызов журнала, но внутренняя регистрация Akka, а также запись акторов classi c является асинхронной, что означает, что временная метка записи журнала берется с момента, когда вызывается базовая реализация регистратора, что может сначала удивить. Если вы хотите более точно вывести временную метку для таких регистраторов, используйте атрибут MD C akkaTimestamp. Обратите внимание, что ключ MD C не будет иметь никакого значения для типизированного актера.

Дайте мне знать, если это поможет !!

...