Я новичок в использовании функции Python aysnc.Я хочу начать 3 работника для массовой загрузки файлов в сети. И я использую функцию очереди Python aysnc.но когда я запускаю программу, в процессе запускается только один рабочий.
import requests
import json
import common
import asyncio
import time
def http_get(index,type,tag):
page_res = requests.get('{}/{}/{}/{}'.format(common.es_url,index,type,tag), headers={
'Content-Type': 'application/json'
})
return page_res.status_code, page_res.content
def http_post(index, type, tag, o):
resp = requests.post('{}/{}/{}/{}'.format(common.es_url, index, type, tag), headers={
'Content-Type': 'application/json'
}, data=json.dumps(o))
return resp
def get_pubchem_tag():
status_code, content = http_get('configure', '_doc', 'pubchem_tag')
# print(page_res.status_code)
if status_code == 404:
bak_status_code, bak_content = http_get('configure_bak', '_doc', 'pubchem_tag')
if bak_status_code == 404:
o = {
'key': 'pubchem_page',
'record':0
}
http_post('configure', '_doc', 'pubchem_tag', o)
http_post('configure_bak', '_doc', 'pubchem_tag', o)
return o['record']
else:
bak_content_obj = json.loads(bak_content)
http_post('configure', '_doc', 'pubchem_tag', bak_content_obj['_source'])
return bak_content_obj['_source']['record']
else:
content_obj = json.loads(content)
return content_obj['_source']['record']
async def get_pubchem_worker(name, queue):
while True:
page = await queue.get()
url = 'https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{}/JSON/?response_type=display'.format(page)
html_resp = requests.get(url)
if html_resp.status_code in {200, 201}:
obj = json.loads(html_resp.content)
o = {
"key": obj['Record']['RecordNumber'],
"value": json.dumps(obj['Record'])
}
r = http_post('html_compounds', '_doc', obj['Record']['RecordNumber'], o)
if r.status_code in {200, 201}:
o = {
'record': page
}
r = http_post('configure', '_doc', 'pubchem_tag', o)
if r.status_code not in {200, 201}:
http_post('configure', '_doc', 'pubchem_tag', o)
r = http_post('configure_bak', '_doc', 'pubchem_tag', o)
if r.status_code not in {200, 201}:
http_post('configure_bak', '_doc', 'pubchem_tag', o)
print(f'{name} has get pubchem record, id is {page}')
else:
o = {
"key": page,
"exception": "Pubchem put es exception"
}
http_post('pubchem_exception', '_doc', page, o)
print(f'{name} got exception in Pubchem put es process, id is {page}, exception is {r.content}')
else:
o = {
"key": page,
"exception": "Pubchem get http exception"
}
http_post('pubchem_exception', '_doc', page, o)
print(f'{name} got exception in Pubchem get HTTP process, id is {page}, exception is {html_resp.content}')
queue.task_done()
async def multiprocess_get_pubchem():
page = get_pubchem_tag()
queue = asyncio.Queue()
for i in range(1, 10):
queue.put_nowait(page+i)
workers = []
for i in range(3):
worker = asyncio.create_task(get_pubchem_worker(f'worker-{i}', queue))
workers.append(worker)
started_at = time.monotonic()
await queue.join()
total_execute_time = time.monotonic() - started_at
print(f'Total execute time is {total_execute_time:.2f}')
if __name__ == '__main__':
asyncio.run(multiprocess_get_pubchem())
и я получаю следующий результат:
worker-0 has get pubchem record, id is 134
worker-0 has get pubchem record, id is 135
worker-0 has get pubchem record, id is 136
worker-0 has get pubchem record, id is 137
worker-0 has get pubchem record, id is 138
worker-0 has get pubchem record, id is 139
worker-0 has get pubchem record, id is 140
worker-0 has get pubchem record, id is 141
worker-0 has get pubchem record, id is 142
Total execute time is 12.53
Я следую инструкциям на сайте https://docs.python.org/dev/library/asyncio.html
эта проблема смутила меня, но когда я удаляю детали (в то время как True) в функции get_pubchem_worker, другие 2 работника начинают работать, но все они блокируются при выполнении один раз.Кто-нибудь может помочь мне решить эту проблему? Большое спасибо
Когда я добавляю
await asyncio.sleep(0.1)
до
queue.task_done()
, другой работник начинает работать!Работники, которых я создал, следят друг за другом при срабатывании await asyncio.sleep(0.1)
?