Akka.net - потоки с параллелизмом, противодавлением и ActorRef - PullRequest
0 голосов
/ 17 декабря 2018

Пытаюсь узнать, как использовать потоки Akka.net для параллельной обработки элементов из Source.Queue с обработкой, выполненной в Actor.

Мне удалось заставить его работать с вызовомфункция с Sink.ForEachParallel, и она работает как положено.

Возможно ли обрабатывать элементы параллельно с Sink.ActorRefWithAck (как я бы предпочел, чтобы он использовал противодавление)?

1 Ответ

0 голосов
/ 17 декабря 2018

При нажатии кнопки Post при попытке объединить предыдущие попытки и альт!

Предыдущие попытки с ForEachParallel потерпели неудачу, когда я попытался создать актера внутри, но не смог сделать это в асинхронной функции.Если я использую один ранее объявленный актер, то Tell будет работать, но я не смог получить желаемый параллелизм.

Я получил его для работы с маршрутизатором с конфигурацией roundrobin.

var props = new RoundRobinPool(5).Props(Props.Create<MyActor>());
var actor = Context.ActorOf(props);

flow = Source.Queue<Element>(2000,OverflowStrategy.Backpressure)            
.Select(x => {
 return new Wrapper() { Element = x, Request = ++cnt };
})
.To(Sink.ForEachParallel<Wrapper>(5, (s) => { actor.Tell(s); }))
.Run(materializer);

Запрос ++ cnt предназначен для вывода на консоль для проверки того, что запросы обрабатываются должным образом.

MyActor имеет длительную задержку при каждом 10-м запросе для проверки того, что противодавление работает.

...