PyZMQ req socket - зависание на context.term () - PullRequest
1 голос
/ 01 ноября 2019

Пытается корректно завершить работу простого клиента на основе pyzmq, когда сервер недоступен. Ниже приведены 2 фрагмента.

Первый сервер. Это более или менее пример pyzmq. Никакого специального кода здесь:

import zmq
import json

port = 5555

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:{0}".format(port))

while True:
    message = socket.recv_json()
    print(json.dumps(message))
    socket.send_json({'response': 'Hello'})

Следующий клиент.

import zmq

ip = 'localhost'
port = 5555
addr ="tcp://{0}:{1}".format(ip, port)
message = {'value': 10}

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(addr)

if socket.poll(timeout=1000, flags=zmq.POLLOUT) != 0:
    socket.send_json(message, flags=zmq.NOBLOCK)        
    if socket.poll(timeout=1000, flags=zmq.POLLIN) != 0:
        response = socket.recv_json()
        print(response)

socket.disconnect(addr)
socket.close(linger=0)
context.term()

Здесь я попытался усовершенствовать клиента по умолчанию возможностью тайм-аута, если сервер недоступен. В приведенном ниже коде используется метод poll, хотя я также пытался установить таймаут приема для сокета.

Если сервер работает, клиент отправляет и получает ответ и корректно завершает работу.

Если сервер не работает, клиент сразу проходит через первый вызов socket.poll (поскольку zmq просто буферизует сообщение внутри). Он блокируется на 1 секунду при втором вызове socket.poll и правильно пропускает блок recv_json. Затем он зависает на вызове context.term(). Насколько я понимаю из поиска, это будет зависать, если есть сокеты, которые не были закрыты, что, похоже, не так.

Любая помощь очень ценится.

1 Ответ

1 голос
/ 04 ноября 2019

Вкл. возможность тайм-аута, если сервер недоступен"

Тайм-аут возможен, но это не позволит использовать аппаратно REQ/REP два-шаговый танец, чтобы выжить, тем не менее, чтобы продолжить в надлежащем порядке, если одна сторона тайм-аута в противном случае обязательный шаг в d присваивается F inite S tate A автоматическая схема (dFSA не может использовать односторонние ярлыки, это двусторонний dFSA).


Гипотеза:

Еслисервер не работает, клиент сразу проходит через первый вызов socket.poll (так как zmq просто буферизует сообщение внутри). Он блокируется на 1 секунду при втором вызове socket.poll и правильно пропускает блок recv_json. Затем он зависает при вызове context.term().

Проверка:

Давайте рассмотрим код пошагово

def  Test( SetImmediate = False ):
     ##################################################################################
     import zmq, json, time;                                                      print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "import-s: DONE... VER: " ), zmq.zmq_version() )
     ##################################################################################
     ip      = 'localhost';                                                       print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "ip SET..." ) )
     port    =  5555;                                                             print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "port SET..." ) )
     addr    = "tcp://{0}:{1}".format( ip, port );                                print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "addr SET..." ) )
     message = { 'value': 10 };                                                   print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "message SET..." ) )
     ##################################################################################
     context = zmq.Context();                                                     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context INSTANTIATED..." ),                       "|", zmq.strerror( zmq.zmq_errno() ) )
     pass;              aReqSock = context.socket( zmq.REQ );                     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket INSTANTIATED..." ),                        "|", zmq.strerror( zmq.zmq_errno() ) )
     ##################################################################################################################################################################################################################################
     pass;         rc = aReqSock.getsockopt(       zmq.LINGER       );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER    ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     pass;              aReqSock.setsockopt(       zmq.LINGER,    0 );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.LINGER    ) SET..." ),     "|", zmq.strerror( zmq.zmq_errno() ) ) # do not let LINGER block on closing sockets with waiting msgs
     pass;         rc = aReqSock.getsockopt(       zmq.LINGER       );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER    ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     ##################################################################################################################################################################################################################################
     pass;         rc = aReqSock.getsockopt(       zmq.IMMEDIATE    );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     if SetImmediate:
                        aReqSock.setsockopt(       zmq.IMMEDIATE, 1 );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.IMMEDIATE ) SET..." ),     "|", zmq.strerror( zmq.zmq_errno() ) ) # do not enqueue msgs for incoplete connections
     pass;         rc = aReqSock.getsockopt(       zmq.IMMEDIATE    );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     ##################################################################################################################################################################################################################################
     pass;              aReqSock.connect( addr );                                 print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.connect() DONE..." ),                      "|", zmq.strerror( zmq.zmq_errno() ) )
     ##################################################################################
     pass;        rc  = aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT );     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLOUT ) SET..." ),   "|", zmq.strerror( zmq.zmq_errno() ) )
     if      0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ) != 0:# .poll() BLOCKS ~ 1s +NEVER gets a .POLLOUT for an empty TxQueue, does it?
         pass;                                                                    print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... ==  " ), rc )
         pass;          aReqSock.send_json( message,   flags = zmq.NOBLOCK )      # .send()-s dispatches message the REP-side may .recv() at some later time
         pass;                                                                    print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".send_json( zmq.NOBLOCK ): DONE..." ),            "|", zmq.strerror( zmq.zmq_errno() ) )
         pass;    rc  = aReqSock.poll( timeout = 1000, flags = zmq.POLLIN  );     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLIN ) SET..." ),    "|", zmq.strerror( zmq.zmq_errno() ) )
         if  0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLIN  ) != 0:# .poll() BLOCKS < 1s = depends on REP-side response latency ( turn-around-time )
             pass;                                                                print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
             response = aReqSock.recv_json()                                      # .recv() BLOCKS until ... if ever ...
             print( response );                                                   print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".recv_json() COMPLETED" ),                       "|", zmq.strerror( zmq.zmq_errno() ) )
     pass;                                                                        print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "if-ed code-block COMPLETED" ) )
     ##################################################################################
     rc = aReqSock.disconnect( addr );                                            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.disconnect() RETURNED CODE ~ " ), rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
     rc = aReqSock.close(      linger = 0 );                                      print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.close() RETURNED CODE ~ " ),      rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
     rc = context.term();                                                         print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context.term() RETURNED CODE ~ " ),      rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
     ##################################################################################

Это приводит к чему-то следующему:

>>> Test( SetImmediate = False )
____947107.0356056700_ACK: import-s: DONE... VER:  4.2.5
____947107.0356727780_ACK: ip SET...
____947107.0356969039_ACK: port SET...
____947107.0357236000_ACK: addr SET...
____947107.0357460320_ACK: message SET...
____947107.0358552620_ACK: Context INSTANTIATED... | Success
____947107.0362445670_ACK: Socket INSTANTIATED... | Success
____947107.0363074190_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... -1 | Success
____947107.0363573120_ACK: Socket.setsockopt( zmq.LINGER    ) SET... | Invalid argument
____947107.0364004780_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... 0 | Success
____947107.0364456220_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0364890840_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0365797410_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947107.0366972820_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947107.0367464600_ACK: rc was NON-ZERO... ==   2
____947107.0368948240_ACK: .send_json( zmq.NOBLOCK ): DONE... | Resource temporarily unavailable
____947108.0381633660_ACK: rc = .poll( 1000 [ms], zmq.POLLIN ) SET... | Resource temporarily unavailable
____947108.0382736750_ACK: if-ed code-block COMPLETED
____947108.0383544239_ACK: Socket.disconnect() RETURNED CODE ~  None | Resource temporarily unavailable
____947108.0384234400_ACK: Socket.close() RETURNED CODE ~  None | Invalid argument
____947108.0386644470_ACK: Context.term() RETURNED CODE ~  None | Success

и

>>> Test( SetImmediate = True )
____947119.1267617550_ACK: import-s: DONE... VER:  4.2.5
____947119.1268189061_ACK: ip SET...
____947119.1268382660_ACK: port SET...
____947119.1268587380_ACK: addr SET...
____947119.1268772170_ACK: message SET...
____947119.1269678050_ACK: Context INSTANTIATED... | Success
____947119.1271884360_ACK: Socket INSTANTIATED... | Success
____947119.1272257260_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... -1 | Success
____947119.1272587100_ACK: Socket.setsockopt( zmq.LINGER    ) SET... | Invalid argument
____947119.1272875509_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... 0 | Success
____947119.1273175071_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947119.1273461781_ACK: Socket.setsockopt( zmq.IMMEDIATE ) SET... | Invalid argument
____947119.1273732870_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 1 | Success
____947119.1274376540_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947120.1287043930_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947120.1287937190_ACK: if-ed code-block COMPLETED
____947120.1288697980_ACK: Socket.disconnect() RETURNED CODE ~  None | Resource temporarily unavailable
____947120.1289412400_ACK: Socket.close() RETURNED CODE ~  None | Invalid argument
____947120.1291404651_ACK: Context.term() RETURNED CODE ~  None | Success

Что доказывает, что гипотеза не верна: с * 1051 нет проблемcontext.term(), но со способом, как .connect( aTransportClass_Target ) против цели, которой нет, обрабатывается внутренне.

К моему удивлению, в тестируемой версии (v4.2.5).poll( zmq.POLLOUT ) отчеты содержат 2 элементов в направлении .POLLOUT, уже присутствующих в сообщаемом пользователем состоянии TxQueue, без единого явного .send() (поскольку .poll() был запущен сразу после .connect()).

Мне кажется, что это является некоторым несовместимостью с предыдущими версиями (как если бы он пытался сообщить о .connect() -связанном "протоколе / идентичности""-телеметрия вместо того, чтобы сообщать только о сообщениях на уровне приложения пользователя).

В то время как я могу ошибаться, пытаясьвыясните какое-либо обоснование того, почему принципиально пустая очередь будет пытаться сообщить о сообщении, которое уже было в его .POLLOUT -направлении, я надеюсь, что оно достаточно доказано, проблема не имеет ничего общего с .LINGER == 0 / .term() - экземпляр Context() - экземпляр.

КЭД

...