Эффективные запросы API во время итерации - PullRequest
0 голосов
/ 30 апреля 2020

Так что я ищу способ ускорить вывод следующего кода, вызывая API естественного языка Google:

tweets = json.load(input)

client = language.LanguageServiceClient()

sentiment_tweets = []

iterations = 1000

start = timeit.default_timer()

for i, text in enumerate(d['text'] for d in tweets):

    document = types.Document(
    content=text,
    type=enums.Document.Type.PLAIN_TEXT)

    sentiment = client.analyze_sentiment(document=document).document_sentiment

    results = {'text': text, 'sentiment':sentiment.score, 'magnitude':sentiment.magnitude}

    sentiment_tweets.append(results)

    if (i % iterations) == 0:
        print(i, " tweets processed")

sentiment_tweets_json = [json.dumps(sentiments) for sentiments in sentiment_tweets]

stop = timeit.default_timer()

Проблема в том, что список твитов составляет около 100 000 записей, повторяется и делает Вызовы один за другим не производят вывод в допустимом масштабе времени. Я изучаю возможность использования asyncio для параллельных вызовов, хотя, поскольку я все еще новичок с Python и не знаком с пакетом, я не уверен, сможете ли вы сделать функцию сопрограммой с самой собой, чтобы каждый экземпляр Функция выполняет итерацию по списку, как и ожидалось, последовательно. Существует также вопрос управления общим количеством вызовов, сделанных приложением, в рамках определенных квотных ограничений API. Просто хотел узнать, иду ли я в правильном направлении.

Ответы [ 2 ]

0 голосов
/ 30 апреля 2020

Я использую этот метод для одновременных вызовов:

from concurrent import futures as cf

def execute_all(mfs: list, max_workers: int = None):
    """Excecute concurrently and mfs list.

    Parameters
    ----------
    mfs : list
        [mfs1, mfs2,...]
        mfsN = {
            tag: str,
            fn: function,
            kwargs: dict
        }
        .
    max_workers : int
        Description of parameter `max_workers`.

    Returns
    -------
    dict
        {status, result, error}
        status = {tag1, tag2,..}
        result = {tag1, tag2,..}
        error = {tag1, tag2,..}

    """
    result = {
        'status': {},
        'result': {},
        'error': {}
    }
    max_workers = len(mfs)
    with cf.ThreadPoolExecutor(max_workers=max_workers) as exec:
        my_futures = {
            exec.submit(x['fn'], **x['kwargs']): x['tag'] for x in mfs
        }
        for future in cf.as_completed(my_futures):
            tag = my_futures[future]
            try:
                result['result'][tag] = future.result()
                result['status'][tag] = 0
            except Exception as err:
                result['error'][tag] = err
                result['result'][tag] = None
                result['status'][tag] = 1
    return result

Где каждый результат возвращает индексированный данным тегом (если вам важно определить, какой вызов возвращает какой результат), когда:

mfs = [
    {
        'tag': 'tweet1',
        'fn': process_tweet,
        'kwargs': {
            'tweet': tweet1
        }
    },
    {
        'tag': 'tweet2',
        'fn': process_tweet,
        'kwargs': {
            'tweet': tweet2
        }
    },
]

results = execute_all(mfs, 2)
0 голосов
/ 30 апреля 2020

Хотя asyn c - это один способ, которым вы могли бы go, другой, который может быть проще, - это использование многопроцессорных функций python.

from multiprocessing import Pool

def process_tweet(tweet):
    pass # Fill in the blanks here

# Use five processes at once
with Pool(5) as p:
    processes_tweets = p.map(process_tweet, tweets, 1)

В этом случае «твиты» - это какой-то итератор, и каждый элемент этого итератора будет передан вашей функции. Функция map обеспечит возвращение результатов в том же порядке, в котором были предоставлены аргументы.

...