Лучший шаблон обмена сообщениями между процессами для регулярного прослушивания нескольких процессов на одном локальном хосте - PullRequest
0 голосов
/ 23 июня 2018

У меня странный случай, когда мне нужно создать много процессов из моего основного процесса.

Эти процессы, которые я создаю, будут ставить в очередь некоторые сообщения из веб-сокета.

И через некоторый интервал, как каждую секунду или около того, я буду опрашивать эти небольшие процессы из моего основного процесса.Я использую язык D, а библиотека сообщений ZMQD (это просто оболочка для библиотеки C zmq).

Минимальный пример, который я имею для моего основного процесса:

Socket*[] socketList;

string sRecv( Socket* socket)
{
    ubyte[256] buffer;
    immutable size = socket.receive(buffer);
    import std.algorithm: min;
    return buffer[0 .. min(size,256)].idup.asString();
}

void startServer( string servername )
{
    auto pid = spawnProcess(["/home/erdem/eclipse-workspace/WebSocketDenemesi/websocketdenemesi",
                              servername, "\n"]);
    auto requester = new Socket(SocketType.req);
    auto allName = "ipc:///tmp/" ~  servername;
    requester.connect(allName);
    socketList ~= requester;

}

void main() {

    import std.array : split;
    import std.algorithm : each;

    startServer("iotabtc@depth");
    startServer("iotabtc@aggTrade");
    startServer("ethbtc@depth");

    int counter = 30;
    while(counter--) {
        foreach ( requester; socketList)
        {
            requester.send("send"); 
        }

        foreach ( requester; socketList)
        {
            auto strList = sRecv(requester).split("\n");
            strList.each!( str => writefln("Received [%d]reply [%s]", strList.length,  str) );

        }
        sleep(1000.msecs);
    }
    foreach ( requester; socketList)
    {
        requester.send("done"); 
    }
}

И минимальный пример, который я имею для моих небольших процессов:

WebSocket startSocket( string temp )
{
    auto ws_url = URL(temp);
    auto ws = connectWebSocket(ws_url);
    if ( !ws.connected )
        return null;    
    return  ws;
}

void close( WebSocket ws )
{
    int timeOut = 5;
    while ( ws && ws.connected && timeOut-- )
    {
        vibe.core.concurrency.async( { ws.close(); return true;} ); 
        sleep(5.msecs);
    }   
}

string sRecv(ref Socket socket)
{
    ubyte[256] buffer;
    immutable size = socket.tryReceive(buffer)[0];
    import std.algorithm: min;
    return size ? buffer[0 .. min(size,256)].idup.asString() : "";
}

void main( string[] args ) {

    auto responder = Socket(SocketType.rep);
    string seperatorChar = args[2];
    string temp = "ipc:///tmp/" ~ args[1];
    responder.bind(temp);

    string socketName =  "wss://stream.binance.com:9443/ws/" ~ args[1];
    auto curSocket = startSocket(socketName);
    string curString;
    while (true) {
        auto result = responder.sRecv();
        if ( result == "send")
        {
            responder.send(curString);      
            curString = "";
        }
        else if ( result == "done" )
        {
            break;
        }
        else 
        {
            if ( curSocket.dataAvailableForRead )
            {
                auto text = curSocket.receiveText();
                if ( !curString.empty )
                   curString ~= seperatorChar;
                curString ~= text;
            }
        }
        sleep(100.msecs);
    }
    writeln( "Shutting down: ", args[1]);
    curSocket.close();

}

Это первый разЯ использую эту библиотеку сообщений.Вот почему я использую простые REQ/REP розетки.Есть ли лучший способ достичь моего требования.Например, есть ли лучший шаблон обмена сообщениями?Например, есть шаблон, в котором мои небольшие процессы не блокируются responder.receive( buffer );.

Если они есть, мне не нужно будет прослушивать websocket из другого потока.

1 Ответ

0 голосов
/ 24 июня 2018

Добро пожаловать в распределенные вычисления на базе ZeroMQ

Есть ли лучший шаблон обмена сообщениями например?

Это зависит от того, как ваши процессы должны взаимодействовать. Короче говоря, использование REQ/REP в режиме блокировки - почти худший вариант из меню.

  • учитывая, что ваш веб-сокет только что получает асинхронную часть информации (что является обычным способом, как рынки повторно транслируют поток событий), чистый ws.recv() + PUSHer.send() + if PULLer.poll(): PULLer.recv() конвейеризованное событие- получение + PUSH/PULL распространение + условная повторная обработка лучше всего соответствует реальному поведению.

  • учитывая, что ваш след обрабатывающей фермы может превысить один локальный хост, другие транспортные классы для нелокальных узлов ~ { tipc:// | tcp:// | udp:// | pgm:// | epgm:// | norm:// | vmci:// } могут попасть в игру вместе с ipc:// -линками на вашем текущем локальном хосте - Прозрачность ZeroMQ при работе с этой смесью является отличным преимуществом при освоении Zen-of-Zero.

  • Заданная задержка является критически важной при широкомасштабном распределении обработки, а PUB/SUB Масштабируемая модель археотипа формальной связи может стать выгодной, с возможностью использования .setsockopt( zmq.CONFLATE, 1 ) для узлов без регистрации где только самые последние цены актуальны для принятия любых ответных действий XTO любого рода.

...