Как сделать мою программу асинхронной с asyncio в Python 3.7? - PullRequest
1 голос
/ 22 марта 2019

Мой код для поиска данных из базы данных ES и добавления новых данных в базу данных.Раньше я работал в одном процессе, но это слишком неэффективно, поэтому я хочу добавить asyncio в мой код.Как мне это сделать?Я удалил URL своих данных ES, исходя из соображений безопасности.кто-нибудь может мне помочь?

import json
import requests
from elasticsearch import Elasticsearch
import asyncio

class Cited:
    def __init__(self):
        self.es = Elasticsearch(
            [''],
        )

    async def get_es_item(self):
        query_body = {
            "from": 0,
            "size": 10000
            ,

            "query": {
                "bool": {
                    "must":
                        {
                            "exists": {
                                "field": "extra.S2PaperId"
                            }
                        }
                    , "must_not":
                        {"exists": {
                            "field": "extra.citations"

                        }}

                }
            }
        }
        items = self.es.search(index='item', body=query_body, doc_type=None, request_timeout=6000)
        items = items['hits']['hits']
        for item in items:
            item_type = item['_type']
            item_id = item['_id']
            S2PaperId = item['_source']['extra']['S2PaperId']
            self.search_ss(item_id=item_id, paperId=S2PaperId, item_type=item_type)

    def search_ss(self, item_id, paperId, item_type):
        headers = {
            'user-agent': 'Mozilla/5.0 (Macintosh Intel Mac OS X 10_13_4) AppleWebKit/537.36 '
                          '(KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36'
        }

        req = requests.get('https://api.semanticscholar.org/v1/paper/' + paperId, headers=headers, timeout=100)

        # logging.info(req.url)
        if req.status_code == 200:
            req = json.loads(req.text)
            citations = len(req['citations'])
            citationVelocity = req['citationVelocity']
            influentialCitationCount = req['influentialCitationCount']

            self.es.update(index='item', doc_type=item_type, id=item_id,
                           body={'doc': {'extra': {'citations': citations, 'citationVelocity': citationVelocity,
                                                   'influentialCitationCount': influentialCitationCount}}},
                           request_timeout=6000)

            print(item_id, item_type, citations, citationVelocity, influentialCitationCount)
        else:
            print('s2paper 出错了 直接补0' + item_id, item_type)
            self.es.update(index='item', doc_type=item_type, id=item_id,
                           body={'doc': {'extra': {'citations': 0, 'citationVelocity': 0,
                                                   'influentialCitationCount': 0}}},
                           request_timeout=6000)

cited = Cited()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(cited.get_es_item(), cited.get_es_item(), cited.get_es_item(), cited.get_es_item()))
...