Я использую асинхронную очередь Python для загрузки файлов в сети, но когда я запускаю 3 рабочих для использования очереди, остается только одна работающая - PullRequest
0 голосов
/ 29 сентября 2018

Я новичок в использовании функции Python aysnc.Я хочу начать 3 работника для массовой загрузки файлов в сети. И я использую функцию очереди Python aysnc.но когда я запускаю программу, в процессе запускается только один рабочий.

import requests
import json
import common
import asyncio
import time


def http_get(index,type,tag):
    page_res = requests.get('{}/{}/{}/{}'.format(common.es_url,index,type,tag), headers={
            'Content-Type': 'application/json'
        })
    return page_res.status_code, page_res.content


def http_post(index, type, tag, o):
    resp = requests.post('{}/{}/{}/{}'.format(common.es_url, index, type, tag), headers={
        'Content-Type': 'application/json'
    }, data=json.dumps(o))
    return resp


def get_pubchem_tag():
    status_code, content = http_get('configure', '_doc', 'pubchem_tag')
    # print(page_res.status_code)

    if status_code == 404:
        bak_status_code, bak_content = http_get('configure_bak', '_doc', 'pubchem_tag')
        if bak_status_code == 404:
            o = {
                'key': 'pubchem_page',
                'record':0
            }
            http_post('configure', '_doc', 'pubchem_tag', o)
            http_post('configure_bak', '_doc', 'pubchem_tag', o)
            return o['record']
        else:
            bak_content_obj = json.loads(bak_content)
            http_post('configure', '_doc', 'pubchem_tag', bak_content_obj['_source'])
            return bak_content_obj['_source']['record']
    else:
        content_obj = json.loads(content)
        return content_obj['_source']['record']


async def get_pubchem_worker(name, queue):
    while True:
        page = await queue.get()
        url = 'https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{}/JSON/?response_type=display'.format(page)

        html_resp = requests.get(url)
        if html_resp.status_code in {200, 201}:
            obj = json.loads(html_resp.content)
            o = {
                "key": obj['Record']['RecordNumber'],
                "value": json.dumps(obj['Record'])
            }
            r = http_post('html_compounds', '_doc', obj['Record']['RecordNumber'], o)
            if r.status_code in {200, 201}:
                o = {
                    'record': page
                }
                r = http_post('configure', '_doc', 'pubchem_tag', o)
                if r.status_code not in {200, 201}:
                    http_post('configure', '_doc', 'pubchem_tag', o)
                r = http_post('configure_bak', '_doc', 'pubchem_tag', o)
                if r.status_code not in {200, 201}:
                    http_post('configure_bak', '_doc', 'pubchem_tag', o)
                print(f'{name} has get pubchem record, id is {page}')
            else:
                o = {
                    "key": page,
                    "exception": "Pubchem put es exception"
                }
                http_post('pubchem_exception', '_doc', page, o)
                print(f'{name} got exception in Pubchem put es process, id is {page}, exception is {r.content}')

        else:
            o = {
                "key": page,
                "exception": "Pubchem get http exception"
            }
            http_post('pubchem_exception', '_doc', page, o)
            print(f'{name} got exception in Pubchem get HTTP process, id is {page}, exception is {html_resp.content}')

        queue.task_done()


async def multiprocess_get_pubchem():
    page = get_pubchem_tag()
    queue = asyncio.Queue()

    for i in range(1, 10):
        queue.put_nowait(page+i)

    workers = []
    for i in range(3):
        worker = asyncio.create_task(get_pubchem_worker(f'worker-{i}', queue))
        workers.append(worker)

    started_at = time.monotonic()
    await queue.join()
    total_execute_time = time.monotonic() - started_at
    print(f'Total execute time is {total_execute_time:.2f}')


if __name__ == '__main__':
    asyncio.run(multiprocess_get_pubchem())

и я получаю следующий результат:

worker-0 has get pubchem record, id is 134
worker-0 has get pubchem record, id is 135
worker-0 has get pubchem record, id is 136
worker-0 has get pubchem record, id is 137
worker-0 has get pubchem record, id is 138
worker-0 has get pubchem record, id is 139
worker-0 has get pubchem record, id is 140
worker-0 has get pubchem record, id is 141
worker-0 has get pubchem record, id is 142
Total execute time is 12.53

Я следую инструкциям на сайте https://docs.python.org/dev/library/asyncio.html

эта проблема смутила меня, но когда я удаляю детали (в то время как True) в функции get_pubchem_worker, другие 2 работника начинают работать, но все они блокируются при выполнении один раз.Кто-нибудь может помочь мне решить эту проблему? Большое спасибо


Когда я добавляю

await asyncio.sleep(0.1)

до

queue.task_done()

, другой работник начинает работать!Работники, которых я создал, следят друг за другом при срабатывании await asyncio.sleep(0.1)?

...