Пользовательский производитель-потребитель случайным образом останавливается при использовании блокировки сбора - PullRequest
0 голосов
/ 03 августа 2020

У меня следующая проблема: на сервере запущена служба. У него есть таймер, который срабатывает каждые 5 секунд. Он моделируется для производства товаров для потребителя. Элементы потребляются в многопоточном режиме. Я добавил ведение журнала, но я не могу узнать, когда и где возникает проблема, и это просто блокирует. Никаких исключений или ошибок. Моя цель - продолжать получать запросы на обработку из базы данных и потреблять их. Таймер является производителем.

public class CustomProducerConsumer<T> : IDisposable
{
    private readonly BlockingCollection<T> blockingCollection;
    private readonly Action<T> consumeItem;
    private readonly Task[] workers;

    public CustomProducerConsumer(Action<T> consumeItem,
        int degreeOfParallelism,
        int capacity = 1024)
    {
        this.consumeItem = consumeItem;
        this.blockingCollection = new BlockingCollection<T>(capacity);
        this.workers = Enumerable.Range(1, degreeOfParallelism)
                                .Select(_ => Task.Factory.StartNew(Worker,
                                    TaskCreationOptions.LongRunning))
                                .ToArray();
    }

    public void Dispose()
    {
        // Unblock all workers even if the client
        // didn't call CompleteProcessing
        if (!this.blockingCollection.IsAddingCompleted)
        {
            this.blockingCollection.CompleteAdding();
        }

        Task.WaitAll(this.workers);
        this.blockingCollection.Dispose();
    }

    public void Process(T item)
    {
        this.blockingCollection.TryAdd(item);
    }

    private void Worker()
    {
        foreach (var item in this.blockingCollection.GetConsumingEnumerable())
        {
            this.consumeItem(item);
        }
    }
}

Вот мой код из службы:

private readonly BlockingCollection<StitchingRequestProcessingModel> requestsToBeProcessed =
        new BlockingCollection<StitchingRequestProcessingModel>(10);

    private readonly BlockingCollection<Dictionary<int, StitchingRequest[]>> pendingRequests =
        new BlockingCollection<Dictionary<int, StitchingRequest[]>>(10);

    private readonly Timer timer;

    public Service()
    {
        InitializeComponent();

        this.produceConsumer =
            new CustomProducerConsumer<StitchingRequestModel>(this.ProcessItems,
                Environment.ProcessorCount);

        this.timer = new Timer(o =>
        {
            this.TimerElapsed();

            this.timer.Change(TimeSpan.FromSeconds(5), Timeout.InfiniteTimeSpan);
        }, null, TimeSpan.Zero, Timeout.InfiniteTimeSpan);
        this.ConsumeRequests();
    }

public void TimerElapsed()
    {
        try
        {
//this just adds into the list an item to ping the db for pending requests when available
            this.requestsToBeProcessed.Add(new StitchingRequestProcessingModel());
            this.pendingRequests.TryTake(out Dictionary<int, Request[]> requests);
            if (requests == null)
            {
                return;
            }
            foreach (KeyValuePair<int, Request[]> request in requests)
            {
                this.produceConsumer.Process(new StitchingRequestModel(request));
            }
        }
        catch (Exception exception)
        {
            this.errorLogger.Error(exception.Message);
        }
    }

    private void ConsumeRequests()
    {
        Task.Factory.StartNew(() =>
            {
                while (!this.requestsToBeProcessed.IsCompleted)
                {

                    if (this.tokenSource.Token.IsCancellationRequested)
                    {
                        break;
                    }

                    StitchingRequestProcessingModel data = null;
                    try
                    {
                        data = this.requestsToBeProcessed.Take();
                    }
                    catch (InvalidOperationException)
                    {
                    }

                    if (data == null)
                    {
                        continue;
                    }

                    try
                    {
                        // this just executes sql query to get those request from db
                        var requests = this.requestService.GetPendingRequests();
                        this.pendingRequests.Add(requests);
                    }
                    catch (Exception exception)
                    {
                        this.errorLogger.Error(exception.Message, "Failed to get pending requests");
                    }
                }
            },
            this.tokenSource.Token,
            TaskCreationOptions.LongRunning, TaskScheduler.Current);
    }

    private void ProcessItems(StitchingRequestModel model)
    {
        foreach (StitchingRequest request in model.Requests)
        {
            this.requestsToBeProcessed.Add(new StitchingRequestProcessingModel(request);
        }
    }

Основная причина, по которой я поместил потребляющие элементы в блокирующую коллекцию, - это Nhibernate. Это вызывает у меня проблемы при многопоточности. Нет идей, что еще попробовать, и почему этот подход не работает. Я не хочу вызывать CompleteAdding при сборе блокировки, поскольку мне нужно, чтобы запросы добавлялись и обрабатывались в первом доступном потоке.

Таймер для каждого прошедшего события попытается создать ожидающий запрос, который будет добавлен в блокировку сбор и обработка в первый доступный ход. Служба работает 2 ~ 3 часа и просто останавливается. Метод ProcessItems может быть долгим. Процессор 8-ядерный.

ОБНОВЛЕНИЕ Добавлена ​​отмена для задачи потребителя.

1 Ответ

0 голосов
/ 06 августа 2020

Решена проблема с отсутствием работы с entity-объектом между потребителем и производителем. Создан dto для необходимой информации.

...