Как работать с многопоточностью с ConcurrentQueue <T> - PullRequest
18 голосов
/ 29 декабря 2010

Я пытаюсь выяснить, каким будет лучший способ работы с очередью. У меня есть процесс, который возвращает DataTable. Каждая DataTable, в свою очередь, объединяется с предыдущей DataTable. Есть одна проблема, слишком много записей для хранения до окончательного BulkCopy (OutOfMemory).

Итак, я решил, что должен обрабатывать каждый входящий DataTable немедленно. Думая о ConcurrentQueue<T> ... но я не понимаю, как метод WriteQueuedData() узнал бы, что таблица в порядке очереди и записала ее в базу данных.

Например:

public class TableTransporter
{
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>();

    public TableTransporter()
    {
        tableQueue.OnItemQueued += new EventHandler(WriteQueuedData);   // no events available
    }

    public void ExtractData()
    {
        DataTable table;

        // perform data extraction
        tableQueue.Enqueue(table);
    }

    private void WriteQueuedData(object sender, EventArgs e)
    {
        BulkCopy(e.Table);
    }
}

Мой первый вопрос, кроме того факта, что у меня фактически нет никаких событий для подписки, если я вызываю ExtractData() асинхронно, это все, что мне нужно? Во-вторых, я что-то упускаю из-за того, что ConcurrentQueue<T> функционирует и нуждается в какой-то форме триггера для асинхронной работы с объектами в очереди?

Обновление Я только что получил класс из ConcurrentQueue<T>, который имеет обработчик события OnItemQueued. Тогда:

new public void Enqueue (DataTable Table)
{
    base.Enqueue(Table);
    OnTableQueued(new TableQueuedEventArgs(Table));
}

public void OnTableQueued(TableQueuedEventArgs table)
{
    EventHandler<TableQueuedEventArgs> handler = TableQueued;

    if (handler != null)
    {
        handler(this, table);
    }
}

Есть какие-либо опасения по поводу этой реализации?

Ответы [ 3 ]

24 голосов
/ 29 декабря 2010

Из моего понимания проблемы вам не хватает нескольких вещей.

Параллельная очередь - это структура данных, предназначенная для чтения и записи в несколько потоков без необходимости явной блокировки структуры данных. (Весь этот джаз решается за кулисами, или коллекция реализована таким образом, что ей не нужно брать блокировку.)

Имея это в виду, похоже, что шаблон, который вы пытаетесь использовать, является "Производителем / Потребителем". Во-первых, у вас есть несколько задач по созданию работы (и добавлению элементов в очередь). А во-вторых, у вас есть вторая задача. Поглощение вещей из очереди (и удаление предметов).

Так что вам действительно нужны две темы: одна добавляет элементы, а вторая удаляет элементы. Поскольку вы используете параллельную коллекцию, у вас может быть несколько потоков, добавляющих элементы, и несколько потоков, удаляющих элементы. Но очевидно, что чем больше у вас конфликтов в параллельной очереди, тем быстрее это станет узким местом.

16 голосов
/ 18 июля 2014

Я думаю, ConcurrentQueue полезно только в очень немногих случаях.Его главное преимущество в том, что он не блокируется.Однако обычно поток (-ы) производителя должен каким-либо образом информировать поток (-ы) потребителя о наличии данных, доступных для обработки.Эта сигнализация между потоками требует блокировок и сводит на нет преимущества использования ConcurrentQueue.Самый быстрый способ синхронизации потоков - это использование Monitor.Pulse(), которое работает только в пределах блокировки.Все другие инструменты синхронизации работают медленнее.

Конечно, потребитель может просто постоянно проверять, есть ли что-то в очереди, которое работает без блокировок, но является огромной тратой ресурсов процессора.Немного лучше, если потребитель ждет между проверками.

Поднять поток при записи в очередь - очень плохая идея.Использование ConcurrentQueue для сохранения, возможно, 1 микросекунды будет полностью потрачено впустую, если выполнить eventhandler, что может занять в 1000 раз больше времени.

Если вся обработка выполняется в обработчике события или асинхронном вызове, вопросзачем еще очередь нужна?Лучше передать данные непосредственно обработчику и вообще не использовать очередь.

Обратите внимание, что реализация ConcurrentQueue довольно сложна для обеспечения параллелизма.В большинстве случаев лучше использовать обычный Queue<> и блокировать каждый доступ к очереди.Поскольку доступ к очереди требует только микросекунд, крайне маловероятно, что 2 потока будут обращаться к очереди за одну и ту же микросекунду и вряд ли когда-либо будет задержка из-за блокировки.Использование обычного Queue<> с блокировкой часто приводит к более быстрому выполнению кода, чем ConcurrentQueue.

3 голосов
/ 29 декабря 2010

Это полное решение для того, что я придумал:

public class TableTransporter
{
    private static int _indexer;

    private CustomQueue tableQueue = new CustomQueue();
    private Func<DataTable, String> RunPostProcess;
    private string filename;

    public TableTransporter()
    {
        RunPostProcess = new Func<DataTable, String>(SerializeTable);
        tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued);
    }

    void tableQueue_TableQueued(object sender, TableQueuedEventArgs e)
    {
        //  do something with table
        //  I can't figure out is how to pass custom object in 3rd parameter
        RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename);
    }

    public void ExtractData()
    {
        // perform data extraction
        tableQueue.Enqueue(MakeTable());
        Console.WriteLine("Table count [{0}]", tableQueue.Count);
    }

    private DataTable MakeTable()
    { return new DataTable(String.Format("Table{0}", _indexer++)); }

    private string SerializeTable(DataTable Table)
    {
        string file = Table.TableName + ".xml";

        DataSet dataSet = new DataSet(Table.TableName);

        dataSet.Tables.Add(Table);

        Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file);
        string xmlstream = String.Empty;

        using (MemoryStream memstream = new MemoryStream())
        {
            XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet));
            XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8);

            xmlSerializer.Serialize(xmlWriter, dataSet);
            xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray());

            using (var fileStream = new FileStream(file, FileMode.Create))
                fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2);
        }
        filename = file;

        return file;
    }

    private void PostComplete(IAsyncResult iasResult)
    {
        string file = (string)iasResult.AsyncState;
        Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file);

        RunPostProcess.EndInvoke(iasResult);
    }

    public static String UTF8ByteArrayToString(Byte[] ArrBytes)
    { return new UTF8Encoding().GetString(ArrBytes); }

    public static Byte[] StringToUTF8ByteArray(String XmlString)
    { return new UTF8Encoding().GetBytes(XmlString); }
}

public sealed class CustomQueue : ConcurrentQueue<DataTable>
{
    public event EventHandler<TableQueuedEventArgs> TableQueued;

    public CustomQueue()
    { }
    public CustomQueue(IEnumerable<DataTable> TableCollection)
        : base(TableCollection)
    { }

    new public void Enqueue (DataTable Table)
    {
        base.Enqueue(Table);
        OnTableQueued(new TableQueuedEventArgs(Table));
    }

    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;

        if (handler != null)
        {
            handler(this, table);
        }
    }
}

public class TableQueuedEventArgs : EventArgs
{
    #region Fields
    #endregion

    #region Init
    public TableQueuedEventArgs(DataTable Table)
    {this.Table = Table;}
    #endregion

    #region Functions
    #endregion

    #region Properties
    public DataTable Table
    {get;set;}
    #endregion
}

В качестве доказательства концепции, похоже, он работает довольно хорошоМаксимум я видел 4 рабочих потока.

...