Google API / Celery (с django / redis) - функция с вызовом api работает как обычный функционал, добавьте .delay (), чтобы добавить ее в очередь асинхронных задач, получите ошибку SSL - PullRequest
0 голосов
/ 02 ноября 2019

Обновление: я обнаружил, что celery даже отлично работает с другими вызовами API, но не с API Google, поэтому мне кажется, что каким-то образом Google API и Celery не совместимы или необходимо изменить некоторые настройки?

У меня есть прекрасная настройка сельдерея в том смысле, что она работает для простых задач, таких как hello world (вызов hello_world.delay () в представлениях django не вызывает проблем).

Я создал еще одну более сложную функцию, включающую создание многихвызывает API для получения данных для обновления базы данных. Просто вызывая эту функцию без сельдерея (update_comments ()), проблем нет, и база данных обновляется (это занимает около 15 секунд или около того ... именно поэтому я хочу использовать сельдерей). В настоящее время функция выполняет десятки обращений к API для обновления модели.

Проблема в том, как только я добавляю .delay (), чтобы сделать это задачей сельдерея. Я получаю следующие ошибки (maxretry ssl error). Я знаю, что у меня правильно настроен сельдерей для базовых задач, так как он отлично работает для базовой функции hello world, но не для этой более сложной функции со многими вызовами API. Я не могу понять, почему существует это несоответствие или есть какие-либо настройки, которые я должен изменить, потому что у меня есть вызов API. В настоящее время просто работает на dev.

Задачи

    @task()
def update_comments():
    for channel in Channel.objects.all():
        search_url='https://www.googleapis.com/youtube/v3/commentThreads'
        params1={
             'part': 'replies,snippet',
             'allThreadsRelatedToChannelId': channel.channel,
              'searchTerms': '#vi',
             'maxResults': 100,
             'order': 'time',
             'key' : settings.YOUTUBE_API_DATA_KEY
        }
        params2={
             'part': 'replies,snippet',
             'allThreadsRelatedToChannelId': channel.channel,
              'searchTerms': '#faq',
             'maxResults': 100,
             'order': 'time',
             'key' : settings.YOUTUBE_API_DATA_KEY
        }
        params3={
             'part': 'replies,snippet',
             'allThreadsRelatedToChannelId': channel.channel,
              'searchTerms': '#save',
             'maxResults': 100,
             'order': 'time',
             'key' : settings.YOUTUBE_API_DATA_KEY
        }
        r1=requests.get(search_url, params=params1)
        r2=requests.get(search_url, params=params2)
        r3=requests.get(search_url, params=params3)
        rj1=r1.json()
        rj2=r2.json()
        rj3=r3.json()

        a=1
        b=1
        c=1
        for x in range(0, len(rj1['items'])):
            if a ==1:
                reply_count=rj1['items'][x]['snippet']['totalReplyCount']
                for y in range (0, reply_count):
                    if rj1['items'][x]['replies']['comments'][y]['snippet']['channelId']==channel.channel:
                        if Comments.objects.filter(question=rj1['items'][x]['snippet']['topLevelComment']['snippet']['textOriginal'], answer=rj1['items'][x]['replies']['comments'][y]['snippet']['textOriginal'], channel=channel, key="#vi").exists():
                            a=2
                            break
                        else:
                            #get out of for loop
                            new_comment=Comments(question=rj1['items'][x]['snippet']['topLevelComment']['snippet']['textOriginal'], answer=rj1['items'][x]['replies']['comments'][y]['snippet']['textOriginal'], channel=channel, key="#vi")
                            new_comment.save()
                            break
                    else:
                        continue
                    break
            else:
                break


        for x in range(0, len(rj2['items'])):
            if b == 1:
                reply_count=rj2['items'][x]['snippet']['totalReplyCount']
                for y in range (0, reply_count):
                    if rj2['items'][x]['replies']['comments'][y]['snippet']['channelId']==channel.channel:
                        if Comments.objects.filter(question=rj2['items'][x]['snippet']['topLevelComment']['snippet']['textOriginal'], answer=rj2['items'][x]['replies']['comments'][y]['snippet']['textOriginal'], channel=channel, key="#faq").exists():
                            b=2
                            break
                        else:
                            #get out of for loop
                            new_comment=Comments(question=rj2['items'][x]['snippet']['topLevelComment']['snippet']['textOriginal'], answer=rj2['items'][x]['replies']['comments'][y]['snippet']['textOriginal'], channel=channel, key="#faq")
                            new_comment.save()
                            break
                    else:
                        continue
                    break
            else:
                break


        for x in range(0, len(rj3['items'])):
            if c == 1:
                reply_count=rj3['items'][x]['snippet']['totalReplyCount']
                for y in range (0, reply_count):
                    if rj3['items'][x]['replies']['comments'][y]['snippet']['channelId']==channel.channel:
                        if Comments.objects.filter(question=rj3['items'][x]['snippet']['topLevelComment']['snippet']['textOriginal'], answer=rj3['items'][x]['replies']['comments'][y]['snippet']['textOriginal'], channel=channel, key="#save").exists():
                            c=2
                            break
                        else:
                            #get out of for loop
                            new_comment=Comments(question=rj3['items'][x]['snippet']['topLevelComment']['snippet']['textOriginal'], answer=rj3['items'][x]['replies']['comments'][y]['snippet']['textOriginal'], channel=channel, key="#save")
                            new_comment.save()
                            break
                    else:
                        continue
                    break
            else:
                break 

Просмотры

def tes(request):
    update_comments.delay()

Настройки

CELERY_BROKER_URL = 'redis: //127.0.0.1: 6379/0 'BROKER_TRANSPORT =' redis 'CELERY_RESULT_BACKEND =' django-db '

ОШИБКА

 ERROR/ForkPoolWorker-4] Task todo.tasks.update_comments[d368d4eb-d149-4461-a829-e8372cb9a6a5] raised unexpected: ConnectionError(MaxRetryError("HTTPSConnectionPool(host='www.googleapis.com', port=443): Max retries exceeded with url: /youtube/v3/commentThreads?part=replies%2Csnippet&allThreadsRelatedToChannelId=UC_EyncGJh2QuQHhcfuDWL6g&searchTerms=%23vi&maxResults=100&order=time&key=xx (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x109380cd0>: Failed to establish a new connection: [Errno 60] Operation timed out'))"))
Traceback (most recent call last):
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/urllib3/connection.py", line 157, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
    raise err
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
    sock.connect(sa)
TimeoutError: [Errno 60] Operation timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
    chunked=chunked,
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/urllib3/connectionpool.py", line 376, in _make_request
    self._validate_conn(conn)
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/urllib3/connectionpool.py", line 994, in _validate_conn
    conn.connect()
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/urllib3/connection.py", line 334, in connect
    conn = self._new_conn()
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/urllib3/connection.py", line 169, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.VerifiedHTTPSConnection object at 0x109380cd0>: Failed to establish a new connection: [Errno 60] Operation timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/urllib3/connectionpool.py", line 720, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/urllib3/util/retry.py", line 436, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='www.googleapis.com', port=443): Max retries exceeded with url: /youtube/v3/commentThreads?part=replies%2Csnippet&allThreadsRelatedToChannelId=UC_EyncGJh2QuQHhcfuDWL6g&searchTerms=%23vi&maxResults=100&order=time&key=xx (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x109380cd0>: Failed to establish a new connection: [Errno 60] Operation timed out'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/celery/app/trace.py", line 385, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/celery/app/trace.py", line 648, in __protected_call__
    return self.run(*args, **kwargs)
  File "/Users/xx/Desktop/trydjango/src/todo/tasks.py", line 49, in update_comments
    r1=requests.get(search_url, params=params1)
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/requests/api.py", line 75, in get
    return request('get', url, params=params, **kwargs)
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/requests/api.py", line 60, in request
    return session.request(method=method, url=url, **kwargs)
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/requests/sessions.py", line 533, in request
    resp = self.send(prep, **send_kwargs)
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/requests/sessions.py", line 646, in send
    r = adapter.send(request, **kwargs)
  File "/Users/xx/Desktop/trydjango/lib/python3.7/site-packages/requests/adapters.py", line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPSConnectionPool(host='www.googleapis.com', port=443): Max retries exceeded with url: /youtube/v3/commentThreads?part=replies%2Csnippet&allThreadsRelatedToChannelId=UC_EyncGJh2QuQHhcfuDWL6g&searchTerms=%23vi&maxResults=100&order=time&key=xx (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x109380cd0>: Failed to establish a new connection: [Errno 60] Operation timed out'))
...