ZeroMQ PUSH / PULL и потерянное сообщение - PullRequest
6 голосов
/ 23 февраля 2011

Я использую ZeroMQ из .NET и застрял, пытаясь исправить странную проблему. У меня есть сокет типа PUSH и один тип PULL поверх TCP. Когда клиент отключается, сервер все еще может отправить сообщение (обратите внимание, что в метод Socket.Send не передаются флаги), которое получает много лотов, прежде чем начать блокировку и ждать, пока клиент повторно подключится и доставит сообщения, которые я пытаюсь отправить позже.

Как я могу избежать потери сообщения (или в тесте наихудшего случая, если клиент подключен и если не отправляет фиктивное сообщение, которое я могу позволить себе потерять)?

Заранее спасибо!

Редактировать: дальнейшее тестирование показывает, что если я подожду 1 секунду после отправки первого сообщения после отключения клиентом, второе заблокируется, но если я вообще не жду, я могу отправить как много сообщений, как я хочу, и все они будут потеряны. Это довольно странно ...

1 Ответ

3 голосов
/ 04 апреля 2011

Документация ZeroMQ отмечает, что это проблема с настройками PUSH / PULL, и предлагает следующий шаблон: добавление настройки REP / REQ для обеспечения координации узлов, когда вы ожидаете фиксированное количество подписчиков.Однако, если вы не можете заранее узнать количество подписчиков, вам следует подумать об изменении протокола, чтобы он был более устойчивым к этим условиям.

Синхронизированный издатель в C (от ZGuide)

//
//  Synchronized publisher
//
#include "zhelpers.h"

//  We wait for 10 subscribers
#define SUBSCRIBERS_EXPECTED  10

int main (void) 
{
    s_version_assert (2, 1);
    void *context = zmq_init (1);

    //  Socket to talk to clients
    void *publisher = zmq_socket (context, ZMQ_PUB);
    zmq_bind (publisher, "tcp://*:5561");

    //  Socket to receive signals
    void *syncservice = zmq_socket (context, ZMQ_REP);
    zmq_bind (syncservice, "tcp://*:5562");

    //  Get synchronization from subscribers
    int subscribers = 0;
    while (subscribers < SUBSCRIBERS_EXPECTED) {
        //  - wait for synchronization request
        char *string = s_recv (syncservice);
        free (string);
        //  - send synchronization reply
        s_send (syncservice, "");
        subscribers++;
    }
    //  Now broadcast exactly 1M updates followed by END
    int update_nbr;
    for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
        s_send (publisher, "Rhubarb");

    s_send (publisher, "END");

    zmq_close (publisher);
    zmq_close (syncservice);
    zmq_term (context);
    return 0;
}
...