Отладка asyncio websockets в Python Jupyter - PullRequest
0 голосов
/ 29 мая 2019

У меня проблемы с запуском asyncio с веб-сокетами для моего python jupyter. Я прочитал эту существующую ветку: Как запустить асинхронный код Python в блокноте Jupyter?

Тем не менее, я все еще получаю ошибки, когда применяю исправление только что запущенного: await в Jupyter python.

Когда я использую стандартную строку кода:

asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msg)))

Мое сообщение об ошибке выглядит следующим образом:

RuntimeError                              Traceback (most recent call last)
<ipython-input-40-23500a349044> in <module>
     65 asyncio.run_coroutine_threadsafe(call_api(msg), loop)
     66 
---> 67 asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msg)))
     68 

C:\ProgramData\Anaconda3\lib\asyncio\base_events.py in run_until_complete(self, future)
    569         future.add_done_callback(_run_until_complete_cb)
    570         try:
--> 571             self.run_forever()
    572         except:
    573             if new_task and future.done() and not future.cancelled():

C:\ProgramData\Anaconda3\lib\asyncio\base_events.py in run_forever(self)
    524         self._check_closed()
    525         if self.is_running():
--> 526             raise RuntimeError('This event loop is already running')
    527         if events._get_running_loop() is not None:
    528             raise RuntimeError(

RuntimeError: This event loop is already running

Я применяю исправление в ссылочной теме, как показано ниже

# jupyter cannot use python main syntax and have to use loop form below.
loop = asyncio.get_event_loop()
loop.create_task(call_api(msg))
asyncio.run_coroutine_threadsafe(call_api(msg), loop)

но тогда я не получаю никаких результатов, за исключением строки кода согласно:

<Future at 0xa02de10 state=pending>

полный код, как указано ниже.

# this code seems to work

import pandas as pd
import numpy as np
import scipy as sc
import datetime as dt
import time
import matplotlib.pyplot as plt

from datetime import date
import asyncio
import websockets
import json

from IPython.display import clear_output

# To subscribe to this channel:
msg = \
    {"jsonrpc": "2.0",
     "method": "public/subscribe",
     "id": 42,
     "params": {
        "channels": ["markprice.options.btc_usd"]}
    }


db = []
column = ['iv', 'mark_price', 'commod', 'expiry', 'strike', 'callput']

async def call_api(msg, t_delay =10, t_period =1):
    async with websockets.connect('wss://test.deribit.com/ws/api/v2') as websocket:
        await websocket.send(msg)

        c_time = 0.0 # initialize start time
        s_time = time.time() + t_delay # get end time time

        while (websocket.open)& (c_time <= s_time): # this is while webscoket is open, we use 'while websocket.open:'
            response = await websocket.recv()
            d = json.loads(response)
            if 'params' not in d:
                continue

            else:
                db = d['params']['data']

            # sets up the panda dataframe and the information transform.
            df = pd.DataFrame(db)
            df['splitinst'] = df['instrument_name'].str.split("-")
            df[['commod','expiry','strike','callput']] = pd.DataFrame(df.splitinst.values.tolist(), index=df.index)
            df['expiry'] = pd.to_datetime(df['expiry'])
            df = df.sort_values(by=['expiry','strike'])

            df.to_json('C:/settles.json', orient='records')
            df.to_csv('C:/settles.csv')


            await asyncio.sleep(t_period) # wait for this seconds before doing loo again
            c_time = time.time() # update the time

            clear_output(wait = True)
            print('current time is', c_time)

            print(df.iloc[0])

# jupyter cannot use python main syntax and have to use loop form below.
loop = asyncio.get_event_loop()
loop.create_task(call_api(msg))
asyncio.run_coroutine_threadsafe(call_api(msg), loop)

# asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msg)))
...