Akka.Net Streams - источник перестает вытягивать элементы, когда ошибка выбрасывает буфер - PullRequest
0 голосов
/ 11 октября 2018

Я немного поиграл с пакетом расширения streams для Akka.Net и заметил эту ошибку при попытке объединить методы буфера и газа:

using (var system = ActorSystem.Create("test-system"))
using (var materializer = system.Materializer(GetSettings(system)))
{
            int index = 0;
            var sink = Sink.ActorRefWithAck<KeyValue>(
                system.ActorOf<Writer>(), 
                new OnInitMessage(), 
                new OnAcknowledgeMessage(), 
                OnComplete.Instance, 
                exception => new OnError(exception));

            ServiceBusSource
                .Create(client, message =>
                {
                    var json = new StreamReader(message.GetBody<Stream>(), Encoding.UTF8).ReadToEnd();
                    var result = JsonConvert.DeserializeObject<KeyValue>(json);

                    message.Complete();

                    return result;
                })
                .WithLogger(system, entity => $"{entity.Key} => {entity.Value}")
                .Buffer(1, OverflowStrategy.Fail)
                .Throttle(1, TimeSpan.FromSeconds(5), 3, ThrottleMode.Shaping)
                .ToMaterialized(sink, Keep.Right)
                .Run(materializer);

            Console.ReadLine();
}

I'mиспользуя ServiceBusSource от Alpakka Это пакеты, на которые я ссылаюсь:

  • Akka.Streams: 1.3.1
  • Akka.Streams.Azure.ServiceBus: 0.1.0
  • WindowsAzure.ServiceBus: 4.1.3

Я намеренно заставляю его потерпеть неудачу, чтобы увидеть, как ведет себя НО после сбоя стратегии буфера поток завершается и элементы больше не извлекаются.

KeyValue.cs

public class KeyValue
{
    public int Id { get; set; }

    public string Key { get; set; }

    public string Value { get; set; }

    public DateTime Produced { get; set; }

    public DateTime Emitted { get; set; }

    public override string ToString()
    {
        return $"[{Produced}] - [{Emitted}] => {Id} {Key}:{Value}";
    }
}

GetSettings Метод:

ActorMaterializerSettings GetSettings(ActorSystem system)
        {
            return ActorMaterializerSettings.Create(system)
                .WithSupervisionStrategy(cause =>
                {
                    system.Log.Error(cause, "Failed");
                    return Directive.Resume;
                });
        }

1 Ответ

0 голосов
/ 19 октября 2018

Существует несколько способов обработки ошибок внутри потока - большинство из них описано в docs :

  1. Используйте Recover, чтобы сделать резервное событие из-за ошибки.
  2. Используйте RecoverWithRetries, чтобы разрешить перенаправление в другой поток при ошибке.
  3. Используйте Restart.WithBackoff, чтобы перестроить повторный поток после экспоненциальной задержки отката.
  4. Использовать WithSupervisionStrategy- это очень ограниченный вариант, поскольку он работает только на этапах, которые ссылаются на него явно (как описано в документации).

Ваш случай задуман - когда вы используете OverflowStrategy.Fail, это означает,что при достижении переполнения будет выдана ошибка.Реакция большинства этапов akka заключается в немедленном закрытии потока при сбое.

...