Многопоточность, семафор и события - PullRequest
0 голосов
/ 17 мая 2018

Я пытаюсь выполнить некоторые команды, которые приходят из RabbitMQ. Это около 5 мсг / сек. Так как слишком много сообщений, я должен отправить поток для выполнения, но у меня не так много потоков, поэтому я установил ограничение 10.

так что идея заключалась в том, чтобы сообщения MSS приходили к работнику, помещались в очередь, и любой из 10 потоков достигал пика и выполнялся. Все это с помощью семафора.

После некоторых экспериментов я не знаю почему, но мой поток выполняет только 3 или 4 элемента, после этого он просто останавливается без ошибок ...

Проблема, которую я считаю, заключается в логике, когда событие вызывает метод для выполнения, не мог думать лучше ...

Почему обрабатываются только первые 4 сообщения ??

Какой шаблон или лучший способ сделать это?

Вот некоторые части моего кода:

const int MaxThreads = 10;
private static Semaphore sem = new Semaphore(MaxThreads, MaxThreads);
private static Queue<BasicDeliverEventArgs> queue = new Queue<BasicDeliverEventArgs>();

static void Main(string[] args)
{
consumer.Received += (sender, ea) =>
               {
                var m = JsonConvert.DeserializeObject<Mail>(ea.Body.GetString());
                Console.WriteLine($"Sub-> {m.Subject}");
                queue.Enqueue(ea);
                RUN();
              };

            channel.BasicConsume(queueName, false, consumer);

            Console.Read();
}

private static void RUN()
{
            while (queue.Count > 0)
            {
                sem.WaitOne();
                var item = queue.Dequeue();
                ThreadPool.QueueUserWorkItem(sendmail, item);
            }
}

private static void sendmail(Object item)
{

//.....soem processing stuff....

//tell rabbitMq that everything was OK
channel.BasicAck(deliveryTag: x.DeliveryTag, multiple: true);

//release thread
sem.Release();

}

1 Ответ

0 голосов
/ 17 мая 2018

Я думаю, что вы могли бы использовать блокирующую коллекцию здесь. Это упростит код. Таким образом, ваш отправитель электронной почты будет выглядеть примерно так:

public class ParallelEmailSender : IDisposable
{
    private readonly BlockingCollection<string> blockingCollection;

    public ParallelEmailSender(int threadsCount)
    {
        blockingCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());
        for (int i = 0; i < threadsCount; i++)
        {
            Task.Factory.StartNew(SendInternal);
        }
    }

    public void Send(string message)
    {
        blockingCollection.Add(message);
    }

    private void SendInternal()
    {
        foreach (string message in blockingCollection.GetConsumingEnumerable())
        {
            // send method
        }
    }

    public void Dispose()
    {
        blockingCollection.CompleteAdding();
    }
}

Конечно, вам нужно будет добавить логику перехвата ошибок, а также можно улучшить процесс завершения работы приложения, используя токены отмены.

Я настоятельно рекомендую прочитать замечательную электронную книгу о многопоточном программировании , написанную Джозефом Албахари.

...