Websocket получает закрытие соединения через ~ 2 часа, несмотря на пинг - PullRequest
0 голосов
/ 13 мая 2018

Программа берет данные из gdax websocket и сохраняет их в таблице sqlite. Программа работает нормально в течение первых 2-2,5 ч, и после этого я получаю сообщение об ошибке закрытия соединения от отладчика. Ведьма странная, потому что я верил, что программа сможет перехватить эту ошибку без помощи отладчика.

Вот урезанная версия моего кода:

import time
import datetime
import json
import sqlite3
import urllib3
import timeit
import urllib.request
import requests
from websocket import create_connection

conn=sqlite3.connect(":memory:")
c = conn.cursor()  
c.execute("CREATE TABLE `Ticker` (  `ID`    INTEGER PRIMARY KEY AUTOINCREMENT,  `Type`  TEXT,   `Sequence`  INTEGER,    `Product_id`    TEXT,   `Price` INTEGER,    `Open_24h`  INTEGER,    `Volume_24h`    INTEGER,    `Low_24h`   INTEGER,    `High_24h`  INTEGER,    `Volume_30d`    INTEGER,    `Best_bid`  INTEGER,    `Best_ask`  INTEGER,    `side`  TEXT,   `time`  TEXT,   `trade_id`  INTEGER,    `last_size` INTEGER)")
class Websocket():

def __init__(self, wsurl="wss://ws-feed.gdax.com", ws=None, agi1=2, ping_start=0,produkty=['ETH-EUR', 'LTC-EUR', 'BTC-EUR'], newdict={}):
    self.wsurl = wsurl
    self.ws = None
    self.agi1=agi1
    self.ping_start=ping_start
    self.produkty=produkty
    self.newdict=newdict
    print(self.wsurl)
    print(self.produkty)

def KonektorWebsocketTicker(self):
    x=0
    b=['type', 'sequence', 'product_id', 'price', 'open_24h', 'volume_24h', 'low_24h', 'high_24h', 'volume_30d', 'best_bid', 'best_ask', 'side', 'time', 'trade_id', 'last_size']
    i=0
    dane=json.loads(self.ws.recv())
    typtranzakcji=dane.get('type',None)      
    if typtranzakcji=='error':                                          #obsługa błędu... mam nadzieje
        print(json.dumps(dane, indent=4, sort_keys=True))
    if typtranzakcji=='ticker':
        while i<len(b):
            a = eval("dane.get('" + b[i] + "', None)")
            self.newdict[b[i]]=a
            i=i+1
        c.execute("INSERT INTO Ticker(type, sequence, product_id, price, open_24h, volume_24h, low_24h, high_24h, volume_30d, best_bid, best_ask, side, time, trade_id, last_size) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", (self.newdict['type'], self.newdict['sequence'], self.newdict['product_id'], self.newdict['price'], self.newdict['open_24h'], self.newdict['volume_24h'], self.newdict['low_24h'], self.newdict['high_24h'], self.newdict['volume_30d'], self.newdict['best_bid'], self.newdict['best_ask'], self.newdict['side'], self.newdict['time'], self.newdict['trade_id'], self.newdict['last_size']))

def Polacz(self):
    self.ws=create_connection(self.wsurl)

    if self.agi1 == 0:
        kanaly =None
    elif self.agi1 == 1:
        kanaly= "heartbeat"
    elif self.agi1 == 2:
        kanaly=["ticker"]
    elif self.agi1 == 3:
        kanaly="level2"
    else:
        print('ERROR:WRONG ARGUMENT! (arg1)=', self.agi1)

    print(kanaly)

    if kanaly is None:
        self.ws.send(json.dumps({'type': 'subscribe', 'product_ids': self.produkty}))
    else:
        self.ws.send(json.dumps({'type': 'subscribe', 'product_ids': self.produkty, 'channels': [{"name": kanaly, 'product_ids': self.produkty,}]}))    #wysłanie subskrybcji
    while True:
        try:
            if (time.time() - self.ping_start) >= 20:
                self.ws.ping("ping")
                print('%s ping' % time.ctime())
                self.ping_start = time.time()
        except ValueError as e:
            print('{} Error :{}'.format(time.ctime(), e))
        except Exception as e:
            print('{} Error :{}'.format(time.ctime(), e))
        else:
            print('%s  no ping' % time.ctime())

        dane=json.loads(self.ws.recv())


        if self.agi1==2:
            self.KonektorWebsocketTicker()

        conn.commit()
        time.sleep(1)

class Autotrader():
a='1'
b='1'
if a=="1":
    produkty=["BTC-EUR"]

if b=='1':
    webs=Websocket(produkty=produkty)
    webs.Polacz()  

Может кто-нибудь объяснить мне, в чем здесь проблема, потому что отладчик не очень полезен, и случайное изменение кода и ожидание + 2 ч информации об ошибке не является хорошим способом исправить это

1 Ответ

0 голосов
/ 28 июня 2018

На какой бирже вы планируете торговать?Обычно на github есть несколько хороших библиотек для больших обменов:)

Я считаю, что проблема заключается в "self.ws.recv ()", который останавливает выполнение до получения сообщения.Это не позволяет вам достичь "self.ws.ping (" ping ")", который поддерживает ваш веб-сокет.

Поскольку я работаю над чем-то похожим, у меня пока нет конкретного решения, но я цитирую Многопоточный, неблокирующий клиент веб-сокета кажется, что keepalive должен запускаться в отдельном потоке, чтобы он не блокировался:

"Всегда будет выделенный потокпрослушивать сокет (в данном случае основной поток, который входит в цикл внутри run_forever, ожидающего сообщений). Если вы хотите, чтобы какая-то другая вещь продолжалась, вам понадобится другой поток. "

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...