Время проблемы с ZMQ Recv и recv_string - PullRequest
0 голосов
/ 05 октября 2018

этот код, который я использую ниже, является сабвуфером ZMQ для издателя, который предоставляет мне данные.Он использует счетчик, чтобы сообщить мне, когда его 30 и 59 секунд, чтобы запустить мою запись в CSV каждые 30 секунд или около того.

Проблема: я сейчас синхронизирую все процессы в моем потоке.строки, где message и message2 = socket.recv_string () занимают где-то от полсекунды до 20 секунд для получения строки.Таким образом заставляя нить пропустить 30 и 59 секундные интервалы, которые я установил.Это не происходило вчера.Другие таймеры для операторов if занимают 0,00001 или 0,0 секунды.Так что эта часть не проблема

Мне интересно, что может повлиять на это.Может ли это быть вычислительная мощность моего компьютера?Или строка приема основана на том, как долго издатель действительно что-то отправляет?

Я не работаю в среде разработки или производства, а на общем сервере с чем-то вроде 15 других людей и еговиртуальная.Нулевой клиент.У меня никогда не было этой проблемы раньше, и по другому сценарию, который я настроил для другого паба / саба ZMQ, я получаю сообщения за 0,01 или 0,001 секунды вплоть до 3 секунд.Который более управляем, но норма была .01.

Любые советы или помощь были бы удивительными.Заранее спасибо

import zmq
import pandas as pd
import time
import threading



df_fills = pd.DataFrame()
df_signal = pd.DataFrame()
second_v = [30,59]
s = 0
m = 0
h = 0
d = 0

def counter():
    global h,s,m,d
    while True:
        s += 1
        #print("Second:{}".format(s))
        if s >=60:
            m +=1
            s = 0
        if m >= 60:
            h += 1
            m = 0
        if h >= 24:
            d += 1
            h = 0     
        #print(s)
        time.sleep(1)


class zmq_thread(threading.Thread):
    def __init__(self,name):
        threading.Thread.__init__(self)
        self.name = name
    def run(self):  
        global df_fills, second_v,s 
        print('zmq started')
        context = zmq.Context()
        socket = context.socket(zmq.SUB)              
        socket.connect(SERVER)        
        socket.setsockopt_string(zmq.SUBSCRIBE,'F')            
        print('socket connected')     
        tickers = [a bunch of tickers] 
        while True:                                 
            try:
                start2 = time.time()                               
                if s == 30:
                    print('break')
                    if df_fills.empty == True:
                       print('running fill thread again')
                       z = zmq_thread('Start_ZMQ') 
                       #time.sleep(.7)
                       z.run()  
                    else:
                        start = time.time()
                        print('writing fills')
                        filename = "a CSV"
                        with open(filename, 'a') as f:
                            df_fills.to_csv(f, encoding = 'utf-8', index = False, header = False)
                            f.close()     
                            print('wrote fills')                                          
                            end = time.time()
                            print(end-start)
                            df_fills = df_fills.iloc[0:0]                    
                            z = zmq_thread('Start_ZMQ') 
                            z.run()                    
                    return df_fills     
                end2 = time.time()
                print(end2-start2) 
                start3 = time.time()
                message = socket.recv_string()
                message2 = socket.recv_string()  
                end3 = time.time()
                print(end3-start3, 'message timing')
                print(s)
                start1 = time.time()
                if message == 'F':
                    # message2_split = message2.split("'")
                    message2_split = message2.split(";")
                    message3_split = [e[3:] for e in message2_split]
                    message4 = pd.Series(message3_split)

                    if message4[0] in tickers:
                        df_fills = df_fills.append(message4, ignore_index=True)
                        print('fill')
                end1 = time.time()
                print(end1-start1)
            except KeyboardInterrupt:
                break              



counter = threading.Thread(target = counter)
zmq_loop = zmq_thread('Start_ZMQ')
#%%


counter.start()
zmq_loop.start()

1 Ответ

0 голосов
/ 05 октября 2018

Я не осознавал, что типичная recv_string в ZMQ - это блокировка по умолчанию.Итак, я сделал это

               message = socket.recv_string(flags = zmq.NOBLOCK)
               message2 = socket.recv_string(flags = zmq.NOBLOCK)               
           except zmq.ZMQError as e:               
               if e.errno == zmq.EAGAIN:
                    pass             
           else:     
                if message == 'ABA_BB':
                   message2_split = message2.split(";")
                   message3_split = [e[3:] for e in message2_split]
                   message4 = pd.Series(message3_split)
                   #print(message4)
                   if message4[2] == '300':                        
                       df_signal = df_signal.append(message4, ignore_index=True)               
                       print('Signal Appended')
...