0MQ: Как использовать ZeroMQ в многопоточном режиме? - PullRequest
16 голосов
/ 30 апреля 2011

Я прочитал руководство ZeroMq и наткнулся на следующее:

НЕ ДОЛЖНО делиться сокетами ØMQ между потоками.Разъемы ØMQ не безопасны.Технически это возможно, но для этого требуются семафоры, блокировки или мьютексы.Это сделает ваше приложение медленным и хрупким.Единственное место, где удаленно делиться сокетами между потоками, - это языковые привязки, которые должны творить чудеса, такие как сборка мусора в сокетах.

и позже:

Помните: Не используйте и не закрывайте сокеты, кроме как в потоке, который их создал.

Я также понял, что ZeroMQ Context является потокобезопасным.

Есликласс регистрирует событие другого класса, в .Net это событие может вызываться из потока, отличного от потока, в котором был создан прослушиватель.

Я думаю, что есть только две опции, которые можно отправитьчто-то через ZeroMQ-Sockets из обработчика событий:

  • Синхронизировать поток-обработчик событий с потоком, в котором был создан ZeroMQ- Socket в
  • Создать новый ZeroMQ- Socket / получить существующий ZeroMQ- Socket для потока в обработчике событий, используя потокобезопасный ZeroMQ- Context

Похоже, что 0MQ-Guide поВо-первых, и я не думаю, что создание нового ZeroMq-Socket для каждого потока - это эффективный способ.

Мой вопрос :
Каков правильный шаблон(как это должно быть) публиковать сообщения через 0MQ из обработчика событий?

Кроме того, авторы руководства имели в виду ZeroMQ-Binding для .Net, когда писали:

Единственное место, где удаленно делиться сокетами между потоками, - это языковые привязки, которые должны творить чудеса, такие как сборка мусора в сокетах.?

Вот пример кода, чтобы подчеркнуть мою проблему / вопрос:

public class ExampleClass
{
    public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}

public class ByteEventArgs : EventArgs
{
    public byte[] BytesToSend;
}


public class Dispatcher
{
    ZMQ.Context ctx;

    public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
    {
        this.ctx = mqcontext;
        exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
    }

    void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
    {
        // this method might be called by a different thread. So I have to get a new socket etc?
        using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
        {
            // init socket etc..... and finally: 
            socket.Send(e.BytesToSend);
        }
        // isn't that too much overhead?
    }
}

Ответы [ 3 ]

26 голосов
/ 30 апреля 2011

Вы можете создать множество сокетов 0MQ, конечно, столько, сколько у вас есть потоков.Если вы создаете сокет в одном потоке и используете его в другом, вы должны выполнить полный барьер памяти между двумя операциями.Все остальное приведет к странным случайным сбоям в libzmq, поскольку объекты сокетов не являются потокобезопасными.

Существует несколько обычных шаблонов, хотя я не знаю, как они соотносятся конкретно с .NET:

  1. Создание сокетов в потоках, которые их используют, точка.Совместное использование контекстов между потоками, которые тесно связаны в одном процессе, и создание отдельного содержимого в потоках, которые не связаны тесно.В высокоуровневом C API (czmq) они называются присоединенными и отсоединенными потоками.
  2. Создайте сокет в родительском потоке и передайте во время создания потока присоединенному потоку.Вызов создания потока выполнит полный барьер памяти.С этого момента используйте сокет only в дочернем потоке.«использование» означает recv, send, setsockopt, getsockopt и close.
  3. Создайте сокет в одном потоке и используйте в другом, выполняя свой собственный полный барьер памяти между каждым использованием.Это чрезвычайно деликатно, и если вы не знаете, что такое «полный барьер памяти», вам не следует этого делать.
7 голосов
/ 01 декабря 2012

В .net framework v4 и выше вы можете использовать параллельный сбор для решения этой проблемы. А именно Производитель-Потребитель. Несколько потоков (обработчиков) могут отправлять данные в потокобезопасную очередь, и только один поток потребляет данные из очереди и отправляет их через сокет.

Вот идея:

sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);

// start single-threaded data send loop
Task.Factory.StartNew(() => {
    using(var socket = context.Socket()) {
        MyStuff stuffToSend;
        // this enumerable will be blocking until CompleteAdding is called
        foreach(var stuff in sendQueue.GetConsumingEnumerable())
            socket.Send(stuff.Serialize());
    }
});

// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;
2 голосов
/ 01 июля 2011

Не забудьте взглянуть на транспорт inproc. Может быть полезно использовать inproc: // сокеты для межпотокового взаимодействия и иметь один поток, который открывает сокеты для связи с другими процессами / серверами.

Вам все еще нужен по крайней мере один сокет на поток, но в inproc не используется сетевой уровень IP вообще.

...