Мой код для поиска данных из базы данных ES и добавления новых данных в базу данных.Раньше я работал в одном процессе, но это слишком неэффективно, поэтому я хочу добавить asyncio в мой код.Как мне это сделать?Я удалил URL своих данных ES, исходя из соображений безопасности.кто-нибудь может мне помочь?
import json
import requests
from elasticsearch import Elasticsearch
import asyncio
class Cited:
def __init__(self):
self.es = Elasticsearch(
[''],
)
async def get_es_item(self):
query_body = {
"from": 0,
"size": 10000
,
"query": {
"bool": {
"must":
{
"exists": {
"field": "extra.S2PaperId"
}
}
, "must_not":
{"exists": {
"field": "extra.citations"
}}
}
}
}
items = self.es.search(index='item', body=query_body, doc_type=None, request_timeout=6000)
items = items['hits']['hits']
for item in items:
item_type = item['_type']
item_id = item['_id']
S2PaperId = item['_source']['extra']['S2PaperId']
self.search_ss(item_id=item_id, paperId=S2PaperId, item_type=item_type)
def search_ss(self, item_id, paperId, item_type):
headers = {
'user-agent': 'Mozilla/5.0 (Macintosh Intel Mac OS X 10_13_4) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36'
}
req = requests.get('https://api.semanticscholar.org/v1/paper/' + paperId, headers=headers, timeout=100)
# logging.info(req.url)
if req.status_code == 200:
req = json.loads(req.text)
citations = len(req['citations'])
citationVelocity = req['citationVelocity']
influentialCitationCount = req['influentialCitationCount']
self.es.update(index='item', doc_type=item_type, id=item_id,
body={'doc': {'extra': {'citations': citations, 'citationVelocity': citationVelocity,
'influentialCitationCount': influentialCitationCount}}},
request_timeout=6000)
print(item_id, item_type, citations, citationVelocity, influentialCitationCount)
else:
print('s2paper 出错了 直接补0' + item_id, item_type)
self.es.update(index='item', doc_type=item_type, id=item_id,
body={'doc': {'extra': {'citations': 0, 'citationVelocity': 0,
'influentialCitationCount': 0}}},
request_timeout=6000)
cited = Cited()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(cited.get_es_item(), cited.get_es_item(), cited.get_es_item(), cited.get_es_item()))