Опрос ZeroMQ, когда сокет привязан к нескольким адресам, не всегда работает - PullRequest
0 голосов
/ 14 ноября 2018

Я создаю приложение, используя ZeroMQ в качестве инфраструктуры. Я пытаюсь отладить сантехнику и столкнулся с неожиданным поведением. Примеры приведены на python для простоты, но моя цель - C ++

Это сервер world , написанный в цикле опроса и связанный с двумя адресами

#!/usr/bin/python

from __future__ import print_function

import zmq
context = zmq.Context()

s = context.socket(zmq.REP)
#s.bind("tcp://*:5555")
s.bind("ipc://world.ipc")

poller = zmq.Poller()
poller.register(s, zmq.POLLIN)

while True:
    socks = dict(poller.poll())
    print("... poll returned ...", socks)

    if socks.get(s) == zmq.POLLIN:
        m = s.recv()
        print("Received request: [{0}]".format(m))

        s.send(b"World")

Это привет клиент, использующий один из адресов

#! /usr/bin/python

from __future__ import print_function

import zmq

context = zmq.Context()

# Socket to talk to server
print("Connecting to hello world server...")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
socket.connect("ipc://world.ipc")

# Do 10 requests, waiting each time for a response
for request in range(10):
    print("Sending request {0} ...".format(request))
    socket.send(b"Hello")

    # Get the reply
    message = socket.recv()
    print("Received reply {0} [ {1} ]".format(request, message))

Если клиент hello подключается к двум адресам, дела идут на юг. Если сервер привязан (только) к IPC-адресу, а клиент подключается как к IPC, так и к TCP-адресам (в этом порядке), будет получен ровно один ответ, а затем клиент зависнет. Если TCP-адрес подключен первым, ответы не принимаются.

Если сервер world связывается с двумя адресами, клиент кажется более устойчивым, независимо от того, в каком порядке сервер связывает адреса.

Однако, учитывая немного другой клиент (ближе к моей желаемой реализации), встроенный в веб-сервер wsgiref, у меня постоянный сбой на IPC-адресе:

#!/usr/bin/python
from __future__ import print_function

import zmq
context = zmq.Context()

import wsgiref
from wsgiref.simple_server import make_server

html_head = """<html><head>{meta}<title>{title}</title>{script}</head>"""

def create_head(title, meta="", script=""):
    return html_head.format(title=title, meta=meta, script=script)

def create_foot():
    return "</body></html>"

default_resp_headers = [
    ('Content-Type', 'text/html; charset=utf-8'),
    ]

import sys
class MyApp(object):
    def doIndex(self, environ):
        # Handle the default case

        response_body = [ create_head("Hello App"),
"""
<body>
<h1>Text goes here</h1>
<a href='/hello'>Hello World</a>
""",
                      create_foot(),
    ]

        status = '200 OK'
        response_headers = default_resp_headers[:]

        return status, response_headers, response_body

    def do_hello(self, environ):
        socket = context.socket(zmq.REQ)
        print("DEBUG: connecting to hello socket"); sys.stdout.flush()
        socket.connect("ipc://world.ipc")
#        socket.connect("tcp://localhost:5555")
        print("DEBUG: connected; sending hello"); sys.stdout.flush()
        socket.send(b"Hello")
        print("DEBUG: sent; receiving reply"); sys.stdout.flush()

        response_body = [ create_head("Remote Hello"),
        ]
        if socket.poll(2000) == zmq.POLLIN:
            message = socket.recv()
            print("DEBUG: received", message); sys.stdout.flush()

            response_body.append(
"""<body>
<h1>Hello {message}</h1>
""".format(message=message)
                )

            status = '200 OK'
        else:
            response_body.append(
"""<body><h1>Error</h1><p>Hello Server not responding</p> """
            )
            status = '504 Gateway Timeout'

        response_body.append(create_foot())
        response_headers = default_resp_headers[:]
        return status, response_headers, response_body

    def __call__(self, environ, start_response):
        wsgiref.util.shift_path_info(environ)
        path = environ.get('SCRIPT_NAME', '/')[1:]  # strip the leading /
        method_name = "do_"+path
        print("DEBUG: finding", method_name)
        if not hasattr(self, method_name): method_name = "doIndex"

        print("DEBUG: calling", method_name)
        method = getattr(self, method_name)
        print("DEBUG: method", repr(method))

        status, response_headers, response_body = method(environ)

        # the content-length is the sum of all string's lengths
        content_length = sum([len(s) for s in response_body])
        response_headers.append( ('Content-Length', str(content_length)) )

        start_response(status, response_headers)
        return response_body

def event_loop(interface, port):
    httpd = make_server(interface, port, MyApp())

    dispatch = {}
    poller = zmq.Poller()

    # For each socket to be watched, register with the poller and set a dispatch function. 
    poller.register(httpd, zmq.POLLIN)
    dispatch[httpd.fileno()] = httpd.handle_request  # note this is the bound function, not the invocation

    while True:
        for s,f in poller.poll():
            if f == zmq.POLLIN: dispatch[s]()

if __name__ == "__main__":
    event_loop("", 8051)

Там происходит много всего, что не обязательно относится к проблеме. Ключевой частью является метод do_hello. Это вызывается, когда сделан запрос типа curl http://localhost:8051/hello. Если сокет подключается к IPC-адресу, время ожидания запроса истекает. Если он подключается к TCP-адресу, он всегда работает.

Надеюсь, это понятно. Я все еще изучаю ZMQ, поэтому, если я пропустил что-то очевидное, я буду рад обратной связи.

...