У меня есть приложение, в двух словах оно создает объекты типа "WebPage".
Эти объекты затем вставляются в базу данных SQL.
Я хочу получить эти записи из базы данных, а затем загрузить их в некоторые файлы.
Я создаю цикл While, чтобы прочитать результаты запроса, и для каждой возвращенной строки создается объект веб-страницы и добавляется в статический ConcurrentQueue.
Вот где моя проблема:
Я хочу создать отдельный поток, чтобы при появлении чего-то нового в ConcurrentQueue он отвечал и записывал объект в мой файл.У меня уже есть этот код, работающий в однопоточном и последовательном режиме, но я хочу ускорить его.
У меня в настоящее время есть фрагмент кода внутри считывателя из базы данных SQL, когда ConcurrentQueue достигает определенного количества объектов - он отправляет событие автоматического сброса (см. Ниже)
if(flow.CheckEngineCapacity >= 2000 || (Convert.ToInt32(totalRows) - numberOfRecords) < 2000)
{
waitHandle.Set();
Thread fileProcessor = new Thread(delegate () { flow.ProcessExportEngineFlow(waitHandle); });
fileProcessor.Start();
}
чтов конечном итоге происходит переключение контекста, когда основной поток, кажется, спит до тех пор, пока тот не завершится - я пытался попробовать работать с await и async, но подозреваю, что это не то, что мне нужно.
Как мне заставить работать его по следующему шаблону
- Новый объект добавляется в ConcurrentQueue
- Когда достигается определенное количество объектов в ConcurrentQueueначните удалять объекты из очереди и загружать их в файлы, добавляя при этом объекты в параллельную очередь
ПРИМЕЧАНИЕ: если параллельная очередь достигает определенного количества объектов, она должна блокироваться, пока поток, выполняющий удаление, не сможет освободитьсядо некоторого места.
Причина, по которой я это делаю, состоит в том, чтобы сделать решение как можно более быстрым - узкие места следует записывать в файлы и читать из базы данных.
Ниже приведен пример класса, который я имеюпытался соединить:
public class EngineFlow
{
private static ConcurrentQueue<WebPages> _concurrentWebPageList = new ConcurrentQueue<WebPages>();
public bool IncreaseEngineFlow(WebPages page)
{
bool sucessfullyadded = false;
if (_concurrentWebPageList.Count <= 2000)
{
_concurrentWebPageList.Enqueue(page);
sucessfullyadded = true;
}
else
{
return sucessfullyadded;
}
return sucessfullyadded;
}
public int CheckEngineCapacity { get { return _concurrentWebPageList.Count; } }
private WebPages DecreaseEngineFlow()
{
WebPages page;
_concurrentWebPageList.TryDequeue(out page);
return page;
}
public void ProcessExportEngineFlow(AutoResetEvent waitHandle)
{
if (waitHandle.WaitOne() == false)
{
Thread.Sleep(100);
}
else
{
while (!_concurrentWebPageList.IsEmpty)
{
Console.WriteLine(DecreaseEngineFlow().URL);
Console.WriteLine(CheckEngineCapacity);
waitHandle.Set();
}
}
}
Первоначально это было предназначено, чтобы быть производителем и потребителем, но я чувствую, что, возможно, я обдумываю это.