Как обеспечить уникальное локальное состояние среди потоков для обработки очереди электронной почты - PullRequest
2 голосов
/ 11 апреля 2011

СЦЕНАРИЙ:

Я хочу задать этот вопрос относительно Parallel.For (или любого другого многопоточного подхода в C # .Net). Мне нужно создать многопоточную службу Windows Mailer, которая будет отправлять почту всем получателям так быстро, как только сможет. Я получаю сериализованные строки из базы данных, которая содержит сообщение электронной почты и SmtpDetails, а затем десериализует их в коде.

Письма могут содержать 1000 получателей, и поэтому на двухъядерной машине (машине разработки) одновременно могут работать как минимум 2 потока. Так что я использую параллель. Для того чтобы сделать это. Я читал о делегате LocalInit, который запускается один раз для каждого потока.

КОД:

int itemCount = serMailObj.ReceipientList.Count;

                    Parallel.For(0, itemCount, () =>
                    {
                        return new ThreadLocalStateCache()
                        {
                            Receipient = serMailObj.ReceipientList.Dequeue(),
                            mail = serMailObj.Email,
                            SerializableSmtpDetails = serSmtpObj
                        };
                    }
     , doWork, (x) => { });

 private static ThreadLocalStateCache doWork(int instance, ParallelLoopState state, ThreadLocalStateCache threadInstance)
        {
            KeyValuePair<string, string> kvp = threadInstance.Receipient;
            SerializableSmtpDetails serSmtpObj = threadInstance.SerializableSmtpDetails;
            MailMessage email = threadInstance.mail;

                email.To.Add(new MailAddress(kvp.Key, kvp.Value));

                SmtpClient client = new SmtpClient();
                client.Credentials = new System.Net.NetworkCredential(serSmtpObj.UserName, serSmtpObj.Password);
                client.Host = serSmtpObj.Host;
                client.Port = serSmtpObj.Port;
                client.EnableSsl = serSmtpObj.EnableSSL;

                    try 
                    {           
                        client.Send(email);
                        Console.WriteLine("sending mail....");
                    }
                    catch (Exception)
                    {

                        throw;
                    }


            return null;
        }

public class ThreadLocalStateCache
    {
        public KeyValuePair<string, string> Receipient { get; set; }

        public MailMessage mail { get; set; }

        public SerializableSmtpDetails SerializableSmtpDetails { get; set; }
    }

Код выше довольно прост. Делегат localInit создает локальный объектный поток foreach. и затем doWork пытается обработать очередь.

ПРОБЛЕМЫ:

  1. Я получаю несколько писем для каждого получателя. кажется, что объект электронной почты распределяется между потоками.

  2. иногда происходит сбой при отправке почты.

Пожалуйста, объясните, как я могу изолировать объекты mail и smtpclient в каждом потоке. и обработать очередь.

РЕДАКТИРОВАТЬ 1: Если многопоточные гуру помогли бы мне, пожалуйста, скажите, есть ли у каждого потока возможность иметь уникальную копию своих локальных переменных, а не общих. Поскольку объект MailMessage не является неизменным, я не могу создать его клон. кроме десерализации его в каждом потоке (что могло бы гарантировать создание нового объекта), есть ли волшебный способ достичь этого?

Ответы [ 2 ]

1 голос
/ 11 апреля 2011

Возможны проблемы из-за возврата doWork() null. Как я узнал, отвечая на ваш недавний комментарий здесь , локальный объект потока должен передаваться между последующими вызовами тела Parallel.For в том же потоке, поскольку он должен работать как аккумулятор; см. пример использования в MSDN . Непонятно, что происходит, когда вы возвращаете null, но я бы это исправил и посмотрел, имеет ли это значение.

1 голос
/ 11 апреля 2011

ниже может быть проблемой:

serMailObj.ReceipientList.Dequeue()

Попробуйте использовать ConcurrentQueue (.NET 4) или установите блокировки так, чтобы один поток за раз мог удалить его из очереди.

также обеспечивает параллельные классы или блокировки в любом месте, где у вас есть доступ к общему ресурсу из потоков.

Очередь <(Of <(T>)>) может одновременно поддерживать несколько читателейДо тех пор, пока коллекция не будет изменена.Тем не менее, перечисление в коллекции по сути не является потокобезопасной процедурой.Чтобы гарантировать безопасность потоков во время перечисления, вы можете заблокировать коллекцию во время всего перечисления.Чтобы обеспечить доступ к коллекции из нескольких потоков для чтения и записи, необходимо реализовать собственную синхронизацию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...