Чтение и запись в ConcurrentQueue между потоками (C #) - PullRequest
0 голосов
/ 14 декабря 2018

У меня есть приложение, в двух словах оно создает объекты типа "WebPage".

Эти объекты затем вставляются в базу данных SQL.

Я хочу получить эти записи из базы данных, а затем загрузить их в некоторые файлы.

Я создаю цикл While, чтобы прочитать результаты запроса, и для каждой возвращенной строки создается объект веб-страницы и добавляется в статический ConcurrentQueue.

Вот где моя проблема:

Я хочу создать отдельный поток, чтобы при появлении чего-то нового в ConcurrentQueue он отвечал и записывал объект в мой файл.У меня уже есть этот код, работающий в однопоточном и последовательном режиме, но я хочу ускорить его.

У меня в настоящее время есть фрагмент кода внутри считывателя из базы данных SQL, когда ConcurrentQueue достигает определенного количества объектов - он отправляет событие автоматического сброса (см. Ниже)

if(flow.CheckEngineCapacity >= 2000 || (Convert.ToInt32(totalRows) - numberOfRecords) < 2000)
{
          waitHandle.Set();
          Thread fileProcessor = new Thread(delegate () { flow.ProcessExportEngineFlow(waitHandle); });
          fileProcessor.Start();
 }

чтов конечном итоге происходит переключение контекста, когда основной поток, кажется, спит до тех пор, пока тот не завершится - я пытался попробовать работать с await и async, но подозреваю, что это не то, что мне нужно.

Как мне заставить работать его по следующему шаблону

  • Новый объект добавляется в ConcurrentQueue
  • Когда достигается определенное количество объектов в ConcurrentQueueначните удалять объекты из очереди и загружать их в файлы, добавляя при этом объекты в параллельную очередь

ПРИМЕЧАНИЕ: если параллельная очередь достигает определенного количества объектов, она должна блокироваться, пока поток, выполняющий удаление, не сможет освободитьсядо некоторого места.

Причина, по которой я это делаю, состоит в том, чтобы сделать решение как можно более быстрым - узкие места следует записывать в файлы и читать из базы данных.

Ниже приведен пример класса, который я имеюпытался соединить:

public class EngineFlow
{
    private static ConcurrentQueue<WebPages> _concurrentWebPageList = new ConcurrentQueue<WebPages>();

    public bool IncreaseEngineFlow(WebPages page)
    {
        bool sucessfullyadded = false;
        if (_concurrentWebPageList.Count <= 2000)
        {
            _concurrentWebPageList.Enqueue(page);
            sucessfullyadded = true;
        }
        else
        {
            return sucessfullyadded;
        }
        return sucessfullyadded;
    }

    public int CheckEngineCapacity { get { return _concurrentWebPageList.Count; } }

    private WebPages DecreaseEngineFlow()
    {
        WebPages page;
        _concurrentWebPageList.TryDequeue(out page);
        return page;
    }

    public void ProcessExportEngineFlow(AutoResetEvent waitHandle)
    {
        if (waitHandle.WaitOne() == false)
        {
            Thread.Sleep(100);
        }
        else
        {
            while (!_concurrentWebPageList.IsEmpty)
            {
                Console.WriteLine(DecreaseEngineFlow().URL);
                Console.WriteLine(CheckEngineCapacity);
                waitHandle.Set();
            }
        }
    }

Первоначально это было предназначено, чтобы быть производителем и потребителем, но я чувствую, что, возможно, я обдумываю это.

1 Ответ

0 голосов
/ 14 декабря 2018

Спасибо @Henk Holterman

В новом классе использовалась BlockingCollection, которая решала все проблемы:

 Task.Run(() =>
        {
            flow.ProcessExportEngineFlow();
        });

Task.Run(() =>
                {
                    while (reader.Read())
                    {
                           flow.IncreaseEngineFlow(webpage);
                    }

Определение класса:

 private BlockingCollection<WebPages> _concurrentWebPageList = new BlockingCollection<WebPages>(new ConcurrentQueue<WebPages>(), 1000);
    //private static ConcurrentQueue<WebPages> _concurrentWebPageList = new ConcurrentQueue<WebPages>();

    public void IncreaseEngineFlow(WebPages page)
    {
        _concurrentWebPageList.Add(page);
    }

    public WebPages DecreaseEngineFlow()
    {
        return _concurrentWebPageList.Take();
    }

    public void ProcessExportEngineFlow()
    {
        while(!_concurrentWebPageList.IsCompleted)
        {
            WebPages page = null;
            try
            {
                page = _concurrentWebPageList.Take();
            }
            catch (InvalidOperationException) { }

            if(page != null)
            {
                Console.WriteLine(page.URL);
            }
        }
    }

    public bool GetEngineState()
    {
        return _concurrentWebPageList.IsCompleted;
    }

    public void SetEngineCompleted()
    {
        _concurrentWebPageList.CompleteAdding();
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...