У меня следующая проблема: на сервере запущена служба. У него есть таймер, который срабатывает каждые 5 секунд. Он моделируется для производства товаров для потребителя. Элементы потребляются в многопоточном режиме. Я добавил ведение журнала, но я не могу узнать, когда и где возникает проблема, и это просто блокирует. Никаких исключений или ошибок. Моя цель - продолжать получать запросы на обработку из базы данных и потреблять их. Таймер является производителем.
public class CustomProducerConsumer<T> : IDisposable
{
private readonly BlockingCollection<T> blockingCollection;
private readonly Action<T> consumeItem;
private readonly Task[] workers;
public CustomProducerConsumer(Action<T> consumeItem,
int degreeOfParallelism,
int capacity = 1024)
{
this.consumeItem = consumeItem;
this.blockingCollection = new BlockingCollection<T>(capacity);
this.workers = Enumerable.Range(1, degreeOfParallelism)
.Select(_ => Task.Factory.StartNew(Worker,
TaskCreationOptions.LongRunning))
.ToArray();
}
public void Dispose()
{
// Unblock all workers even if the client
// didn't call CompleteProcessing
if (!this.blockingCollection.IsAddingCompleted)
{
this.blockingCollection.CompleteAdding();
}
Task.WaitAll(this.workers);
this.blockingCollection.Dispose();
}
public void Process(T item)
{
this.blockingCollection.TryAdd(item);
}
private void Worker()
{
foreach (var item in this.blockingCollection.GetConsumingEnumerable())
{
this.consumeItem(item);
}
}
}
Вот мой код из службы:
private readonly BlockingCollection<StitchingRequestProcessingModel> requestsToBeProcessed =
new BlockingCollection<StitchingRequestProcessingModel>(10);
private readonly BlockingCollection<Dictionary<int, StitchingRequest[]>> pendingRequests =
new BlockingCollection<Dictionary<int, StitchingRequest[]>>(10);
private readonly Timer timer;
public Service()
{
InitializeComponent();
this.produceConsumer =
new CustomProducerConsumer<StitchingRequestModel>(this.ProcessItems,
Environment.ProcessorCount);
this.timer = new Timer(o =>
{
this.TimerElapsed();
this.timer.Change(TimeSpan.FromSeconds(5), Timeout.InfiniteTimeSpan);
}, null, TimeSpan.Zero, Timeout.InfiniteTimeSpan);
this.ConsumeRequests();
}
public void TimerElapsed()
{
try
{
//this just adds into the list an item to ping the db for pending requests when available
this.requestsToBeProcessed.Add(new StitchingRequestProcessingModel());
this.pendingRequests.TryTake(out Dictionary<int, Request[]> requests);
if (requests == null)
{
return;
}
foreach (KeyValuePair<int, Request[]> request in requests)
{
this.produceConsumer.Process(new StitchingRequestModel(request));
}
}
catch (Exception exception)
{
this.errorLogger.Error(exception.Message);
}
}
private void ConsumeRequests()
{
Task.Factory.StartNew(() =>
{
while (!this.requestsToBeProcessed.IsCompleted)
{
if (this.tokenSource.Token.IsCancellationRequested)
{
break;
}
StitchingRequestProcessingModel data = null;
try
{
data = this.requestsToBeProcessed.Take();
}
catch (InvalidOperationException)
{
}
if (data == null)
{
continue;
}
try
{
// this just executes sql query to get those request from db
var requests = this.requestService.GetPendingRequests();
this.pendingRequests.Add(requests);
}
catch (Exception exception)
{
this.errorLogger.Error(exception.Message, "Failed to get pending requests");
}
}
},
this.tokenSource.Token,
TaskCreationOptions.LongRunning, TaskScheduler.Current);
}
private void ProcessItems(StitchingRequestModel model)
{
foreach (StitchingRequest request in model.Requests)
{
this.requestsToBeProcessed.Add(new StitchingRequestProcessingModel(request);
}
}
Основная причина, по которой я поместил потребляющие элементы в блокирующую коллекцию, - это Nhibernate. Это вызывает у меня проблемы при многопоточности. Нет идей, что еще попробовать, и почему этот подход не работает. Я не хочу вызывать CompleteAdding при сборе блокировки, поскольку мне нужно, чтобы запросы добавлялись и обрабатывались в первом доступном потоке.
Таймер для каждого прошедшего события попытается создать ожидающий запрос, который будет добавлен в блокировку сбор и обработка в первый доступный ход. Служба работает 2 ~ 3 часа и просто останавливается. Метод ProcessItems может быть долгим. Процессор 8-ядерный.
ОБНОВЛЕНИЕ Добавлена отмена для задачи потребителя.