У меня довольно базовая c конфигурация задачи. Однако у меня проблема в том, что у меня есть переменная, которая обновляется каждый час. Эта переменная является auth_token для API. Если срок действия auth_token истек, ошибка задачи из-за невозможности подключения к API. Если я попытаюсь обработать записи сразу после обновления auth_token, произойдет ошибка задачи, сообщающая, что срок действия моего auth_token истек. Но затем, если я перезапущу работника, задача будет работать, как задумано, с использованием нового auth_token
. Я просматривал документацию Celery в поисках какой-то конфигурации, которую я мог бы добавить к своей задаче, которая заставила бы его очистить кеш после обработки задачи refresh_access_token
, но не может ничего найти.
Любая помощь будет высоко ценится!
serializer.py
# List Create View
class LedgerAPIView(generics.ListCreateAPIView):
queryset = Ledger.objects.all()
serializer_class = LedgerSerializer
permission_classes = [IsAdminUser]
authentication_classes = [TokenAuthentication, SessionAuthentication]
pagination_class = PageNumberPagination
def perform_create(self, serializer):
"""
Posting to the API requires that a user have an auth_token and be classified
as an active_user.
"""
serializer.save()
# Create ProcessRecord Object
ledger_object = Ledger.objects.get(id=serializer.instance.id)
record = ProcessedRecords(payment_allocation_name=ledger_object)
try:
record.save()
except Exception as exc:
formatted = "Cannot update ProcessedRecords Table! ERROR --> {}".format(
repr(exc))
raise Exception(formatted)
# Start Celery Tasker
process_data = retrieve_sf_data.delay(serializer.instance.payment_allocation_name, serializer.instance.id)
tasks.py
@task(name='retrieve_sf_data')
def retrieve_sf_data(name, id):
ledger_object = Ledger.objects.get(id=id)
db_log = ProcessedRecords.objects.get(payment_allocation_name=ledger_object)
# Set status to WIP
if db_log:
ProcessedRecords.objects.filter(id=db_log.id).update(process_status='WIP')
# Begin ETL Pipeline
status, data = run_sf_pipeline(name)
# Update log to reflect outcome
if status == True:
if db_log:
ProcessedRecords.objects.filter(id=db_log.id).update(sf_data_processed=now)
ProcessedRecords.objects.filter(id=db_log.id).update(process_status='Complete')
ProcessedRecords.objects.filter(id=db_log.id).update(process_ended=now)
else:
if db_log:
ProcessedRecords.objects.filter(id=db_log.id).update(sf_data_processed=now)
ProcessedRecords.objects.filter(id=db_log.id).update(process_status='Data Not Processed --> ERROR!')
# Remove record from database so it can be processed again. The log record will be preserved.
clean_mess.delay(id)
# Task that updates value every hour
@task(name="refresh_access_token")
def refresh_access_token():
""" Task to refresh the access_token provided for SkyApi interactions """
client = SkyApi(request_type='SKY_API_OAUTH_ENDPOINT')
response = client.access_token.get_refresh_token(SUBSCRIPTION_KEY, REFRESH_TOKEN, REDIRECT_URI, APPLICATION_ID, APPLICATION_SECRET)
try:
User.objects.filter(nickname='worker-user').update(sky_api_access_token=response.get('access_token'))
except:
pass