Как я могу запретить моему актеру Akka. net блокировать всю мою систему актеров? - PullRequest
0 голосов
/ 27 января 2020

У меня есть проект C# Akka. net с участием 6 актеров. Один субъект (LoggingActor) отвечает за взятие объектов и их передачу на веб-сервер.

Всякий раз, когда LoggingActor сталкивается с сетевым сбоем, кажется, что узкое место моей всей системы акторов; все останавливается, пока LoggingActor не завершит свой метод.

Метод блокировки в LoggingActor содержит следующее:

try
{
    var rs = new RemoteServer();
    var rsr = rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl)
        .Result;
    if (rsr != HttpStatusCode.OK)
    {
        Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
    }
}
catch (HttpRequestException httpEx)
{
    Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
    MovementTransmitBuffer.Add(editedMovement);
}

И функция SendMovement выглядит следующим образом:

public async Task<HttpStatusCode> SendMovement(Movement movement, string siteToken, string serverUrl)
{ 
    //Get remote server
    var createMovementUrl = serverUrl + MovementsRoute + "?site_token=" + siteToken;
    var jsonString = "{ \"movement\": { \"approach\": \"" + movement.Approach + "\", \"exit\": \"" + movement.Exit + "\", \"turntype\": \"" + movement.TurnType + "\", \"objectclass\": \"" + movement.TrafficObjectType + "\", \"created_at\": " + movement.Timestamp.Ticks +  "} }";
    var content = new StringContent(jsonString, Encoding.UTF8, "application/json");

    //Transmit
    var response = await Client.PostAsync(createMovementUrl, content);
    return response.StatusCode;
}

Цель LoggingActor - попытаться отправить свои объекты на сервер и буферизовать их для последующей отправки, если первоначальная попытка не удалась. Не имеет значения, в каком порядке они передаются для моего приложения.

Как я могу изменить LoggingActor или функцию SendMovement, чтобы позволить LoggingActor «проваливаться» через функцию обработки сообщений LoggingActor отправив запрос POST перед ожиданием ответа?

Я думаю, что мне нужен ответ сервера POST (или исключение), чтобы инициировать другое сообщение для субъекта, который сообщает субъекту либо удалить его из очереди отправки или оставить ее (т.е. ничего не делать).

1 Ответ

1 голос
/ 27 января 2020

Вы звоните rs.SendMovement, который возвращает задачу, для которой вы вызываете свойство Result. Это заблокирует весь поток, живущий в текущем пуле потоков - поскольку пул потоков используется всеми участниками / задачами, использующими его, и обычно он содержит 1 рабочий поток на ядро ​​ЦП. Это означает, что вы фактически отключили все ядро ​​ЦП до тех пор, пока удаленный конец не ответит (или не истечет время ожидания).

В общем случае никогда не следует блокировать поток при работе с асинхронным кодом. В Akka. NET работа с API на основе задач может быть выполнена одним из 2 способов.

Использование ReceiveAsync

Первое решение состоит в том, чтобы ваш актер наследовал от ReceiveActor и используйте метод ReceiveAsync для определения вашего обработчика сообщений:

class MyActor : ReceiveActor
{
   public MyActor() 
   {
      ReceiveAsync<MyMessage>(async message => {
         try
         {
            var rs = new RemoteServer();
            var rsr = await rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl);
            if (rsr != HttpStatusCode.OK)
            {
               Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
            }
         }
         catch (HttpRequestException httpEx)
         {
            Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
            MovementTransmitBuffer.Add(editedMovement);
         }
      });
   }
}

Это создаст что-то, известное как non-reentrant actor - это означает, что пока вы не заблокируете какой-либо нижележащий поток (чтобы он мог использоваться другими субъектами, работающими одновременно), вы БУДЕТЕ заблокировать текущего субъекта, не давая ему получать другие сообщения, пока текущий асинхронный c лямбда-обработчик не достигнет конца.

Использование PipeTo

Другой подход, который по умолчанию используется практически во всей внутренней архитектуре akka, заключается в использовании метода PipeTo вместо Task:

class MyActor : ActorBase
{
    // start with an actor in ready state
    protected override bool Receive(object message) => Ready(message);

    bool Ready(object message) 
    {
        switch (message)
        {
            case MyMessage msg:
                var rs = new RemoteServer();
                rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl)
                    .PipeTo(Self, 
                      success: rsr => new Status.Success(rsr), 
                      failure: ex => new Status.Failure(ex));
                Become(Waiting);
                return true;

            default: return false;
        }
    }

    bool Waiting(object message) 
    {
        switch (message)
        {
           case Status.Success success:
               var rsr = (HttpStatusCode)success.Status;
               if (rsr != HttpStatusCode.OK)
               {
                   Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
               }
               Become(Ready);
               return true;

           case Status.Failure failure:
               Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
               MovementTransmitBuffer.Add(editedMovement);
               Become(Ready);
               return true;

            default: return false;
        }
    }
}

Таким образом, вы поворачиваете ваш актер в пользовательский конечный автомат. Часть кода, ответственная за ожидание логики c, здесь разделена на два этапа - здесь представлены как Ready / Waiting состояний. После отправки асинхронного запроса c субъект меняет свое поведение - это означает, что теперь он может обрабатывать другой набор входящих сообщений или по-разному реагировать на них. Возвращенное логическое значение сообщает системе субъекта, обработано ли сообщение текущим субъектом или нет - это может вызвать необработанный вызов, который по умолчанию будет записывать необработанное сообщение.

Одним из преимуществ этого подхода является то, что этот субъект имеет значение reentrant - это означает, что, ожидая, пока SendMovement вернет результат (или потерпит неудачу), этот субъект может свободно принимать и обрабатывать другие сообщения.

...