Я написал API для связи с веб-сайтом, используя websocketapp.Работает нормально только на 2 шт.Если я помещаю свой код на каждый другой компьютер, веб-сокет не получает никакого сообщения и закрывается.Я пробовал много разных машин и операционных систем, много версий python (включая ту же, что работает), беспроводное и проводное соединение, но ничего не изменилось.Там нет ошибки или исключения.Что это может быть?
РЕДАКТИРОВАТЬ: я не владею веб-сайтом или сервером.Все остальные методы отправляют сообщения и анализируют ответ в on_socket_message
import requests
import websocket
import time
from threading import Thread
from datetime import datetime
import json
from position import Position
from constants import ACTIVES
class IQOption():
practice_balance = 0
real_balance = 0
server_time = 0
positions = {}
instruments_categories = ["cfd","forex","crypto"]
top_assets_categories = ["forex","crypto","fx-option"]
instruments_to_id = ACTIVES
id_to_instruments = {y:x for x,y in ACTIVES.items()}
market_data = {}
binary_expiration_list = {}
open_markets = {}
digital_strike_list = {}
candle_data = []
latest_candle = 0
position_id = 0
quotes =[]
position_id_list=[]
def __init__(self,username,password,host="iqoption.com"):
self.username = username
self.password = password
self.host = host
self.session = requests.Session()
self.generate_urls()
self.socket = websocket.WebSocketApp(self.socket_url,on_open=self.on_socket_connect,on_message=self.on_socket_message,on_close=self.on_socket_close,on_error=self.on_socket_error)
def generate_urls(self):
"""Generates Required Urls to operate the API"""
#https://auth.iqoption.com/api/v1.0/login
self.api_url = "https://{}/api/".format(self.host)
self.socket_url = "wss://{}/echo/websocket".format(self.host)
self.login_url = self.api_url+"v1.0/login"
self.profile_url = self.api_url+"profile"
self.change_account_url = self.profile_url+"/"+"changebalance"
self.getprofile_url = self.api_url+"getprofile"
def login(self):
"""Login and set Session Cookies"""
print("LOGIN")
data = {"email":self.username,"password":self.password}
self.log_resp = self.session.request(url="https://auth.iqoption.com/api/v1.0/login",data=data,method="POST")
requests.utils.add_dict_to_cookiejar(self.session.cookies, dict(platform="9"))
self.__ssid = self.log_resp.cookies.get("ssid")
print(self.__ssid)
self.start_socket_connection()
time.sleep(1) ## artificial delay to complete socket connection
self.log_resp2 = self.session.request(url="https://eu.iqoption.com/api/getprofile",method="GET")
ss = self.log_resp2._content.decode('utf-8')
js_ss=json.loads(ss)
self.parse_account_info(js_ss)
self.balance_id = js_ss["result"]["balance_id"]
self.get_instruments()
self.get_top_assets()
self.setOptions()
#self.getFeatures()
time.sleep(1)
print(js_ss["isSuccessful"])
return js_ss["isSuccessful"]
def on_socket_message(self,socket,message):
#do things
def on_socket_connect(self,socket):
"""Called on Socket Connection"""
self.initial_subscriptions()
print("On connect")
def initial_subscriptions(self):
self.send_socket_message("ssid",self.__ssid)
self.send_socket_message("subscribe","tradersPulse")
def on_socket_error(self,socket,error):
"""Called on Socket Error"""
print(message)
def on_socket_close(self,socket):
"""Called on Socket Close, does nothing"""
def start_socket_connection(self):
"""Start Socket Connection"""
self.socket_thread = Thread(target=self.socket.run_forever)
self.socket_thread.start()
def send_socket_message(self,name,msg):
#print(msg)
data = {"name":name,"msg":msg}
self.socket.send(json.dumps(data))