ASYNCIO Проблемы. "Будущее в ожидании" - PullRequest
1 голос
/ 25 апреля 2020

У меня сейчас проблемы с Joblib, в котором запущена многопроцессорная или параллельная программа. Я был в состоянии заставить это работать прежде, и я достигал общего времени в 1 минуту, однако, я пошел и изменился, и испортил кое-что. Я разместил код barebones, так как я получаю ту же ошибку с ним. Я пытаюсь пройти все 150 биржевых символов и использовать финансы Yahoo, чтобы получить цепочку опционов для каждого. Я пытаюсь сделать это на минутной основе. Я также пробовал другие библиотеки, такие как asyncio, и потерпел неудачу с этим. Любые рекомендации будут высоко оценены.

import yfinance as yf


def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

done = []
@background
def downloadChain(ticker):
    print(ticker)
    df = pd.DataFrame()
    daysOut = 100
    chain = 0
    try:
        yf_ticker = yf.Ticker(ticker)
        expiration_dates = yf_ticker.options
        for expiration_date in expiration_dates:
            if (datetime.fromisoformat(expiration_date) - datetime.now()).days <= daysOut:
                try:
                    chain = yf_ticker.option_chain(expiration_date)
                    df = df.append(chain)

                except Exception as e:
                    pass
    except Exception as e:
        pass
    done.append(ticker)

Основная функция:

symbols = ["WATT","TSLA","UVXY","VXX","KEYS","EGO","GLD","WORK","BYND","BLK","PINS","LYFT","SPCE","PAYC","WDAY","UBER","CHGG","SHAK","CMG","CTL","ACB","TLRY","CGC","MJ","ORCL","GRUB","RNG","JWN","TTWO","ADI","ATVI","EA","SNE","GAMR","TXN","TMUS","MCHP","TSM","XBI","ETFC","MS","IWM","EXPD","RCL","CCL","MOMO","BABA","VMW","CRM","ULTA","SKYY","SPLK","FLWS","AVGO","TWTR","PANW","RJF","SABR","LOW","RS","ON","VEEV","DOCU","FB","SNAP","HPQ","RACE","F","AMAT","MRO","STM","AAL","DAL","VICR","XLC","CRON","DELL","T","VZ","S","MELI","CVM","REGN","NVAX","APT","CODX","LAKE","MRNA","EBS","INO", "SPY","SH","QQQ","XLF","KRE","XLV","HYG","LQD","NET","NFLX","ROKU","SHOP","AMZN","AAPL","MSFT","GOOGL","GOOG","NVDA","MU","AMD","INTC","MRVL","QCOMM","SQ","PYPL","TTD","TSLA","ZM","TDOC","LVGO","MDB","HD","VNQ","ARI","ACC","IIPR","EQR","EPR","SPG","PLD","ACB","WHR","NVAX","APT","MDT","CLRX","COST","SDC","LK","PVH","KSS","M","LULU","NKE","KO","BAC","JPM","CS","WFC","ARKW","ARKK","MGM","AMAT","WYNN","TGT","ITT","FXI"]   


for ticker in symbols:
    downloadChain(ticker)

Я добавил отдельный l oop, чтобы увидеть размер массива «done», который содержит все символы, которые были завершены. Я не уверен, что я изменил, но теперь этот l oop завершается примерно через 10-15 минут, когда ожидается 1 минута.

while True:
    clear_output(wait=True)
    print(len(done))

Ответы [ 2 ]

1 голос
/ 25 апреля 2020

Существует две версии «исправления». Добавление их в качестве ответа, а не использование комментариев в чате:)

import asyncio
import pandas as pd
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor


def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(executor, f, *args, **kwargs)

    return wrapped

done = []
@background
def downloadChain(ticker):
    print(ticker)
    df = pd.DataFrame()
    daysOut = 100
    chain = 0
    try:
        yf_ticker = yf.Ticker(ticker)
        expiration_dates = yf_ticker.options
        for expiration_date in expiration_dates:
            if (datetime.fromisoformat(expiration_date) - datetime.now()).days <= daysOut:
                try:
                    chain = yf_ticker.option_chain(expiration_date)
                    df = df.append(chain)

                except Exception as e:
                    pass
    except Exception as e:
        pass
    done.append(ticker)


symbols = ["WATT","TSLA","UVXY","VXX","KEYS","EGO","GLD","WORK","BYND","BLK","PINS","LYFT","SPCE","PAYC","WDAY","UBER","CHGG","SHAK","CMG","CTL","ACB","TLRY","CGC","MJ","ORCL","GRUB","RNG","JWN","TTWO","ADI","ATVI","EA","SNE","GAMR","TXN","TMUS","MCHP","TSM","XBI","ETFC","MS","IWM","EXPD","RCL","CCL","MOMO","BABA","VMW","CRM","ULTA","SKYY","SPLK","FLWS","AVGO","TWTR","PANW","RJF","SABR","LOW","RS","ON","VEEV","DOCU","FB","SNAP","HPQ","RACE","F","AMAT","MRO","STM","AAL","DAL","VICR","XLC","CRON","DELL","T","VZ","S","MELI","CVM","REGN","NVAX","APT","CODX","LAKE","MRNA","EBS","INO", "SPY","SH","QQQ","XLF","KRE","XLV","HYG","LQD","NET","NFLX","ROKU","SHOP","AMZN","AAPL","MSFT","GOOGL","GOOG","NVDA","MU","AMD","INTC","MRVL","QCOMM","SQ","PYPL","TTD","TSLA","ZM","TDOC","LVGO","MDB","HD","VNQ","ARI","ACC","IIPR","EQR","EPR","SPG","PLD","ACB","WHR","NVAX","APT","MDT","CLRX","COST","SDC","LK","PVH","KSS","M","LULU","NKE","KO","BAC","JPM","CS","WFC","ARKW","ARKK","MGM","AMAT","WYNN","TGT","ITT","FXI"]   

with ThreadPoolExecutor() as executor:
    for ticker in symbols:
        downloadChain(ticker)

Второй вариант более стандартный. В котором мы определяем async main, которую мы просим asyncio использовать в качестве главной точки входа.

import asyncio
import pandas as pd
import yfinance as yf
from concurrent.futures import ProcessPoolExecutor


symbols = ["WATT","TSLA","UVXY","VXX","KEYS","EGO","GLD","WORK","BYND","BLK","PINS","LYFT","SPCE","PAYC","WDAY","UBER","CHGG","SHAK","CMG","CTL","ACB","TLRY","CGC","MJ","ORCL","GRUB","RNG","JWN","TTWO","ADI","ATVI","EA","SNE","GAMR","TXN","TMUS","MCHP","TSM","XBI","ETFC","MS","IWM","EXPD","RCL","CCL","MOMO","BABA","VMW","CRM","ULTA","SKYY","SPLK","FLWS","AVGO","TWTR","PANW","RJF","SABR","LOW","RS","ON","VEEV","DOCU","FB","SNAP","HPQ","RACE","F","AMAT","MRO","STM","AAL","DAL","VICR","XLC","CRON","DELL","T","VZ","S","MELI","CVM","REGN","NVAX","APT","CODX","LAKE","MRNA","EBS","INO", "SPY","SH","QQQ","XLF","KRE","XLV","HYG","LQD","NET","NFLX","ROKU","SHOP","AMZN","AAPL","MSFT","GOOGL","GOOG","NVDA","MU","AMD","INTC","MRVL","QCOMM","SQ","PYPL","TTD","TSLA","ZM","TDOC","LVGO","MDB","HD","VNQ","ARI","ACC","IIPR","EQR","EPR","SPG","PLD","ACB","WHR","NVAX","APT","MDT","CLRX","COST","SDC","LK","PVH","KSS","M","LULU","NKE","KO","BAC","JPM","CS","WFC","ARKW","ARKK","MGM","AMAT","WYNN","TGT","ITT","FXI"]   
done = []


def downloadChain(ticker):
    print(ticker)
    df = pd.DataFrame()
    daysOut = 100
    chain = 0
    try:
        yf_ticker = yf.Ticker(ticker)
        expiration_dates = yf_ticker.options
        for expiration_date in expiration_dates:
            if (datetime.fromisoformat(expiration_date) - datetime.now()).days <= daysOut:
                try:
                    chain = yf_ticker.option_chain(expiration_date)
                    df = df.append(chain)

                except Exception as e:
                    pass
    except Exception as e:
        pass
    done.append(ticker)


async def main():
    with ProcessPoolExecutor() as executor:
        for ticker in symbols:
            asyncio.get_event_loop().run_in_executor(executor, downloadChain,
                                                     ticker)



if __name__ == '__main__':
    asyncio.run(main())

Здесь вы также имеете более точный контроль над тем, какого исполнителя использовать. По сути, мы явно кодируем, при каком событии l oop мы работаем и под которым мы добавляем работу исполнителю. Локальные тесты не показали больших различий между ProcessPoolExecutor и ThreadPoolExecutor.

1 голос
/ 25 апреля 2020

Вы можете попробовать пакет под названием yahooquery . У вас есть возможность извлекать данные цепочки опций, а также получать их асинхронно. Вы можете передать все 150 символов или l oop через них:

from yahooquery import Ticker

symbols = ["WATT","TSLA","UVXY","VXX","KEYS","EGO","GLD","WORK","BYND","BLK","PINS","LYFT","SPCE","PAYC","WDAY","UBER","CHGG","SHAK","CMG","CTL","ACB","TLRY","CGC","MJ","ORCL","GRUB","RNG","JWN","TTWO","ADI","ATVI","EA","SNE","GAMR","TXN","TMUS","MCHP","TSM","XBI","ETFC","MS","IWM","EXPD","RCL","CCL","MOMO","BABA","VMW","CRM","ULTA","SKYY","SPLK","FLWS","AVGO","TWTR","PANW","RJF","SABR","LOW","RS","ON","VEEV","DOCU","FB","SNAP","HPQ","RACE","F","AMAT","MRO","STM","AAL","DAL","VICR","XLC","CRON","DELL","T","VZ","S","MELI","CVM","REGN","NVAX","APT","CODX","LAKE","MRNA","EBS","INO", "SPY","SH","QQQ","XLF","KRE","XLV","HYG","LQD","NET","NFLX","ROKU","SHOP","AMZN","AAPL","MSFT","GOOGL","GOOG","NVDA","MU","AMD","INTC","MRVL","QCOMM","SQ","PYPL","TTD","TSLA","ZM","TDOC","LVGO","MDB","HD","VNQ","ARI","ACC","IIPR","EQR","EPR","SPG","PLD","ACB","WHR","NVAX","APT","MDT","CLRX","COST","SDC","LK","PVH","KSS","M","LULU","NKE","KO","BAC","JPM","CS","WFC","ARKW","ARKK","MGM","AMAT","WYNN","TGT","ITT","FXI"]   

# Can either pass them all (probably want to use a proxy)
ticker = Ticker(symbols, asynchronous=True)
df = ticker.option_chain

# Or loop through your list
ticker = Ticker('aapl', asynchronous=True)  # Replace this within the loop
n = 10
dataframes = []
for i in range(0, len(symbols), n):
    ticker.symbols = symbols[i:i+n]
    dataframes.append(ticker.option_chain)
df = pd.concat(dataframes)
...