Как мне реализовать свой собственный продвинутый сценарий «производитель / потребитель»? - PullRequest
1 голос
/ 10 ноября 2010

Примечание:
я сделал полную переработку моего вопроса. Вы можете увидеть исходный вопрос через историю изменений.


Мне нужна «могучая» очередь, которая обеспечивает следующие функциональные возможности:

  • У меня есть определенная область видимости для группы объектов. это означает, что Группа A , Группа B , ... будет иметь собственную очередь
  • Я заполняю очередь в потоке групповой области Поток A (Производитель)
  • Я читаю очередь в потоке групповой области Поток B (Потребитель)

, поэтому у меня будут следующие сценарии:

  1. есть и не будет элемента в очереди (так как задания вызывались с пустой «целевой группой»): Поток B должен выйти из цикла
  2. в данный момент в очереди нет элемента, так как Поток A работает над элементом для постановки в очередь: Поток B должен ждать
  3. в очереди есть элементы: Поток B должен иметь возможность удалить из очереди и обработать элемент
  4. в очереди нет элемента, так как В потоке A больше нет элементов для постановки в очередь: Нить B должна выйти из цикла

Теперь я придумал следующую реализацию:

public class MightyQueue<T>
  where T : class
{
    private readonly Queue<T> _queue = new Queue<T>();

    private bool? _runable;
    private volatile bool _completed;

    public bool Runable
    {
        get
        {
            while (!this._runable.HasValue)
            {
                Thread.Sleep(100);
            }
            return this._runable ?? false;
        }
        set
        {
            this._runable = value;
        }
    }

    public void Enqueue(T item)
    {
        if (item == null)
        {
            throw new ArgumentNullException("item");
        }

        this._queue.Enqueue(item);
    }

    public void CompleteAdding()
    {
        this._completed = true;
    }

    public bool TryDequeue(out T item)
    {
        if (!this.Runable)
        {
            item = null;
            return false;
        }
        while (this._queue.Count == 0)
        {
            if (this._completed)
            {
                item = null;
                return false;
            }
            Thread.Sleep(100);
        }
        item = this._queue.Dequeue();
        return true;
    }
}

, который затем будет использоваться

1055 ** * Производитель 1056 ** * 1057

if (anythingToWorkOn)
{
    myFooMightyQueueInstance.Runable = false;
}
else
{
    myFooMightyQueueInstance.Runable = true;
    while (condition)
    {
        myFooMightyQueueInstance.Enqueue(item);
    }
    myFooMightyQueueInstance.CompleteAdding();
}

Потребитель

if (!myFooMightyQueueInstance.Runable)
{
    return;
}

T item;
while (myFooMightyQueueInstance.TryDequeue(out item))
{
    //work with item
}

но я считаю, что этот подход неверен, так как я использую некоторые Thread.Sleep() -материалы там (не может быть какой-то waitHandle или что-то еще?) ... я не о самом алгоритме или ... Может кто-нибудь помочь мне?

Ответы [ 5 ]

2 голосов
/ 10 ноября 2010

Если у вас есть .Net 4.0, используйте BlockingCollection. Он разбирается со всей этой неразберихой, включая конечную точку, методом CompleteAdding.

Если у вас есть более ранняя версия .Net, то обновите ее (т. Е. Мне лень объяснять, как реализовать то, что для вас уже сделано).

РЕДАКТИРОВАТЬ: Я не думаю, что ваша проблема вообще требует многопоточности. Просто создайте все электронные письма заблаговременно и спите до назначенного времени.

1 голос
/ 10 ноября 2010

Я написал простой пример, который отлично работает для меня и должен подходить для ваших сценариев.Если потребитель работает, это зависит от того, как задана переменная, но вы легко можете изменить ее на более сложное условие, такое как «если почта не существует, но кто-то сказал, что мне нужно больше».

public class MailSystem
{
    private readonly Queue<Mail> mailQueue = new Queue<Mail>();
    private bool running;
    private Thread consumerThread;

    public static void Main(string[] args)
    {
        MailSystem mailSystem = new MailSystem();
        mailSystem.StartSystem();
    }

    public void StartSystem()
    {
        // init consumer
        running = true;
        consumerThread = new Thread(ProcessMails);
        consumerThread.Start();
        // add some mails
        mailQueue.Enqueue(new Mail("Mail 1"));
        mailQueue.Enqueue(new Mail("Mail 2"));
        mailQueue.Enqueue(new Mail("Mail 3"));
        mailQueue.Enqueue(new Mail("Mail 4"));
        Console.WriteLine("producer finished, hit enter to stop consumer");
        // wait for user interaction
        Console.ReadLine();
        // exit the consumer
        running = false;
        Console.WriteLine("exited");
    }

    private void ProcessMails()
    {
        while (running)
        {
            if (mailQueue.Count > 0)
            {
                Mail mail = mailQueue.Dequeue();
                Console.WriteLine(mail.Text);
                Thread.Sleep(2000);
            }
        }
    }
}

internal class Mail
{
    public string Text { get; set; }

    public Mail(string text)
    {
        Text = text;
    }
}
1 голос
/ 10 ноября 2010

Вы должны начать с общей очереди Producer-Consumer и использовать ее.Реализация этого внутри очереди не очень хорошая идея, так как это не позволяет вам использовать семафоры для сигнализации потоков (или вы можете иметь открытые семафоры в своей очереди, но это действительно плохая идея).

Как только поток A поставил в очередь один рабочий элемент, он должен сообщить семафору, чтобы уведомить поток B. Когда поток B завершил обработку всех элементов, он должен сообщить семафору, чтобы уведомить всех остальных о том, что он завершил.Ваш главный поток должен ждать, пока этот второй семафор узнает, что все сделано.

[Edit]

Во-первых, у вас есть производитель и потребитель:

public interface IProducer<T> : IStoppable
{
    /// <summary>
    /// Notifies clients when a new item is produced.
    /// </summary>
    event EventHandler<ProducedItemEventArgs<T>> ItemProduced;
}

public interface IConsumer<T> : IStoppable
{
    /// <summary>
    /// Performs processing of the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    void ConsumeItem(T item);
}

public interface IStoppable
{
    void Stop();
}

Итак, в вашем случае класс, создающий почту, должен будет запустить событие ItemProduced, а класс, отправляющий его, должен будет реализовать ConsumeItem.

И тогда вы передадитеэти два экземпляра для экземпляра Worker:

public class Worker<T>
{
    private readonly Object _lock = new Object();
    private readonly Queue<T> _queuedItems = new Queue<T>();
    private readonly AutoResetEvent _itemReadyEvt = new AutoResetEvent(false);
    private readonly IProducer<T> _producer;
    private readonly IConsumer<T> _consumer;
    private volatile bool _ending = false;
    private Thread _workerThread;

    public Worker(IProducer<T> producer, IConsumer<T> consumer)
    {
        _producer = producer;
        _consumer = consumer;
    }

    public void Start(ThreadPriority priority)
    {
        _producer.ItemProduced += Producer_ItemProduced;
        _ending = false;

        // start a new thread
        _workerThread = new Thread(new ThreadStart(WorkerLoop));
        _workerThread.IsBackground = true;
        _workerThread.Priority = priority;
        _workerThread.Start();
    } 

    public void Stop()
    {
        _producer.ItemProduced -= Producer_ItemProduced;
        _ending = true;

        // signal the consumer, in case it is idle
        _itemReadyEvt.Set();
        _workerThread.Join();
    }

    private void Producer_ItemProduced
         (object sender, ProducedItemEventArgs<T> e)
    {
        lock (_lock) { _queuedItems.Enqueue(e.Item); }

        // notify consumer thread
        _itemReadyEvt.Set();
    }

    private void WorkerLoop()
    {
        while (!_ending)
        {
            _itemReadyEvt.WaitOne(-1, false);

            T singleItem = default(T);
            lock (_lock)
            {
               if (_queuedItems.Count > 0)
               {
                   singleItem = _queuedItems.Dequeue();
               }
            }


            while (singleItem != null)
            {
                try
                {
                    _consumer.ConsumeItem(singleItem);
                }
                catch (Exception ex)
                {
                    // handle exception, fire an event
                    // or something. Otherwise this
                    // worker thread will die and you
                    // will have no idea what happened
                }

                lock (_lock)
                {
                    if (_queuedItems.Count > 0)
                    {
                        singleItem = _queuedItems.Dequeue();
                    }
                }
            }

         }

    } // WorkerLoop

} // Worker

Это общая идея, могут потребоваться некоторые дополнительные настройки.

Чтобы использовать его, вам нужно, чтобы ваши классы реализовалиэти два интерфейса:

IProducer<IMail> mailCreator = new MailCreator();
IConsumer<IMail> mailSender = new MailSender();

Worker<IMail> worker = new Worker<IMail>(mailCreator, mailSender);
worker.Start();

// produce an item - worker will add it to the
// queue and signal the background thread
mailCreator.CreateSomeMail();

// following line will block this (calling) thread
// until all items are consumed
worker.Stop();

Самое замечательное в этом то, что:

  • вы можете иметь столько работников, сколько вам нравится
  • несколько работников могут принимать предметы изодин и тот же производитель
  • несколько работников могут отправлять элементы одному и тому же потребителю (хотя это означает, что вам нужно принять случай, когда потребитель реализован в поточно-ориентированном режиме)
1 голос
/ 10 ноября 2010

То, что вы хотите, может быть сделано с помощью условных переменных. Я напишу пример псевдокода, который не должен быть слишком сложным для реализации.

Одна нить имеет что-то вроде:

while(run)
  compose message
  conditionvariable.lock()
  add message to queue
  conditionvariable.notifyOne()
  conditionvariable.release()

В то время как другой поток имеет что-то вроде этого

while(threadsafe_do_run())
  while threadsafe_queue_empty()
       conditionvariable.wait()
  msg = queue.pop()
  if msg == "die"
      set_run(false)
  conditionvariable.release()
  send msg

Так что, если вы не получили никаких сообщений, нажмите «die-message». То же самое, когда все сообщения были обработаны.

do_run () и queue_empty () должны проверять свои вещи потокобезопасно, использовать соответствующие блокировки.

wait () возвращается, когда вызывается notifyOne (), а затем очередь отправляет сообщение. в большинстве сред условная переменная уже имеет блокировку, вам может понадобиться добавить оператор блокировки самостоятельно в .NET.

0 голосов
/ 10 ноября 2010

Я бы использовал один поток для работы всего процесса. Это генерирует почтовое тело и отправляет. Просто потому, что генерация почтового тела не займет времени, но отправка электронного письма будет.

Также, если вы используете SMTP-сервер, который поставляется с Windows, вы можете просто оставить свою электронную почту в папке очереди, а сервер smtp позаботится об отправке электронной почты.

Таким образом, вы можете запустить несколько потоков (сохраняя ограничение числа), где каждый поток выполняет свою работу. Если вы работаете с набором заданий (данных), то вы можете распараллеливать данные (то есть разбивать данные на куски, которые соответствуют числу ядер в системе, например, и распределять задания (потоки)

Использование Задач сделает все это немного проще в любом случае, то есть 2 потока для отправки одного письма или один поток для выполнения всей работы, но использование нескольких потоков для параллельного выполнения нескольких задач.

...