Освежитель задачи сельдерея - PullRequest
0 голосов
/ 26 февраля 2020

У меня довольно базовая c конфигурация задачи. Однако у меня проблема в том, что у меня есть переменная, которая обновляется каждый час. Эта переменная является auth_token для API. Если срок действия auth_token истек, ошибка задачи из-за невозможности подключения к API. Если я попытаюсь обработать записи сразу после обновления auth_token, произойдет ошибка задачи, сообщающая, что срок действия моего auth_token истек. Но затем, если я перезапущу работника, задача будет работать, как задумано, с использованием нового auth_token. Я просматривал документацию Celery в поисках какой-то конфигурации, которую я мог бы добавить к своей задаче, которая заставила бы его очистить кеш после обработки задачи refresh_access_token, но не может ничего найти.

Любая помощь будет высоко ценится!

serializer.py

# List Create View
class LedgerAPIView(generics.ListCreateAPIView):
    queryset = Ledger.objects.all()
    serializer_class = LedgerSerializer
    permission_classes = [IsAdminUser]
    authentication_classes = [TokenAuthentication, SessionAuthentication]
    pagination_class = PageNumberPagination

    def perform_create(self, serializer):
        """
        Posting to the API requires that a user have an auth_token and be classified
        as an active_user.
        """
        serializer.save()
        # Create ProcessRecord Object 
        ledger_object = Ledger.objects.get(id=serializer.instance.id)
        record = ProcessedRecords(payment_allocation_name=ledger_object)
        try:
            record.save()
        except Exception as exc:
            formatted = "Cannot update ProcessedRecords Table! ERROR --> {}".format(
                repr(exc))
            raise Exception(formatted)

        # Start Celery Tasker
        process_data = retrieve_sf_data.delay(serializer.instance.payment_allocation_name, serializer.instance.id)

tasks.py

@task(name='retrieve_sf_data')
def retrieve_sf_data(name, id):
    ledger_object = Ledger.objects.get(id=id)
    db_log = ProcessedRecords.objects.get(payment_allocation_name=ledger_object)

    # Set status to WIP
    if db_log: 
        ProcessedRecords.objects.filter(id=db_log.id).update(process_status='WIP')

    # Begin ETL Pipeline
    status, data = run_sf_pipeline(name)

    # Update log to reflect outcome
    if status == True: 
        if db_log: 
            ProcessedRecords.objects.filter(id=db_log.id).update(sf_data_processed=now)
            ProcessedRecords.objects.filter(id=db_log.id).update(process_status='Complete')
            ProcessedRecords.objects.filter(id=db_log.id).update(process_ended=now)
    else: 
        if db_log: 
            ProcessedRecords.objects.filter(id=db_log.id).update(sf_data_processed=now)
            ProcessedRecords.objects.filter(id=db_log.id).update(process_status='Data Not Processed --> ERROR!')
            # Remove record from database so it can be processed again. The log record will be preserved. 
            clean_mess.delay(id)


# Task that updates value every hour

@task(name="refresh_access_token")
def refresh_access_token():
    """ Task to refresh the access_token provided for SkyApi interactions """
    client = SkyApi(request_type='SKY_API_OAUTH_ENDPOINT')
    response = client.access_token.get_refresh_token(SUBSCRIPTION_KEY, REFRESH_TOKEN, REDIRECT_URI, APPLICATION_ID, APPLICATION_SECRET)

    try: 
      User.objects.filter(nickname='worker-user').update(sky_api_access_token=response.get('access_token')) 
    except: 
        pass

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...