Я думаю, у меня есть лучший пример - мне нужно получить несколько тысяч записей о билетах от авиакомпаний (на самом деле GDS).Для этого мне нужно установить дорогой сеанс , прежде чем я смогу отправить запрос SOAP или REST.Сеансы ограничены, поэтому я действительно не хочу создавать новый для каждого билета.Это удваивает время, необходимое для запроса и тратит деньги и ресурсы.
Создание пользовательского блока может показаться решением, но на самом деле не это хорошо.Потоки данных устанавливают конвейеры блоков обработки, которые работают с потоком сообщений.Попытка заставить их работать по-другому столкнется с фундаментальными допущениями модели потока данных.
Например, задачи используются для параллелизма, регулирования и распределения нагрузки - опция MaxMessagesPerTask убивает задачи после получения максимального количества сообщений, чтобы одна задача не загружала процессор длядолго.Создание и уничтожение сеансов для каждой задачи может привести к поломке этого механизма. и в итоге создадут больше сеансов, чем необходимо.
Объединение в пул
Один из способов справиться с этим,заключается в использовании пула объектов, снабженного «дорогими» объектами, которые будут использоваться блоками, в данном случае Sessions.Как ни странно, пакет Microsoft.Extensions.ObjectPool предлагает именно такой пул.Документы не существуют , они обманчиво помещаются в дерево ASP.NET
, но это отдельный пакет .NET Standard 2.0.Github source обманчиво прост, и класс использует Interlocked.CompareExchange, чтобы избежать блокировки.Есть даже реализация LeakTrackingObjectPool.
Если бы я знал об этом в прошлом, я мог бы написать:
var pool = new DefaultObjectPool<Session>(new DefaultPooledObjectPolicy<Session>());
Политика DefaultPooledObjectPolicy просто использует new
длясоздать новый экземпляр.Однако легко создать новую политику, например такую, которая использует собственную логику создания или даже фабричный метод:
public class SessionPolicy : DefaultPooledObjectPolicy<Session>
{
public override Session Create()
{
//Do whatever is needed here
return session;
}
}
Перенаправление
Другой вариант заключается в использовании несколько блоков экземпляров и исходный блок для ссылки на все из них.Чтобы избежать отправки всех сообщений в первый блок, необходима ограниченная емкость.Допустим, у нас есть этот фабричный метод:
TransformBlock<TIn,TOut> CreateThatBlockWithSession<TIn,TOut>(Settings someSettings)
{
var session=CreateSomeSessionFrom(someSettings);
var bounded=new DataflowBlockOptions {BoundedCapacity =1};
return new TransformBlock<TIn,TOut>(msg=>FunctionThatUses(msg,session),bounded);
}
И мы используем его для создания нескольких блоков:
_blocks=Enumerable.Range(0,10)
.Select(_=>CreateThatBlockWithSession(settings))
.ToArray();
Исходный блок может подключаться ко всем этим блокам:
foreach(var target in _blocks)
{
_source.LinkTo(target,options);
}
А затем свяжите все эти блоки со следующим блоком.Сложность в том, что мы не можем просто распространять завершение.Если один из блоков завершен, он заставит следующий блок завершиться, даже если в других блоках ожидают сообщения.
Решение состоит в том, чтобы использовать Task.WhenAll
и ContinueWith
для передачи завершения доследующий блок:
foreach(var target in _blocks)
{
target.LinkTo(_nextBlock);
}
var allTasks=_blocks.Select(blk=>blk.Completion);
Task.WhenAll(allTasks)
.ContinueWith(_=>_nextBlock.Complete());
Более надежная реализация будет проверять состояние всех задач IsFaulted
и вызывать Fault()
в следующем блоке, если одна из них не будет выполнена