Джанго - Сельдерей - SQS - S3 - Получение сообщений - PullRequest
0 голосов
/ 08 ноября 2019

У меня есть приложение Django, и я использую Celery, SQS и S3. Когда я запускаю следующую функцию, используя Django, Celery и SQS, эта функция работает и каждую минуту печатает «привет», как и должно быть.

from celery.task import periodic_task
from celery.schedules import crontab
@periodic_task(run_every=crontab(hour='*', minute='*', day_of_week="*"))
def print_hello():
    print('hello world')

Но приложение также связано с S3 Bucket. Каждый раз, когда новый файл сохраняется в S3, в очередь SQS отправляется уведомление. Проблема возникает, когда сообщение уведомления отправляется в очередь SQS. Когда уведомление достигает очереди, рабочий не работает. Останавливает периодическую задачу print_hello (), выдает следующее сообщение об ошибке:

[2019-11-07 22: 10: 57,173: CRITICAL / MainProcess] Неустранимая ошибка: Ошибка («Неверное заполнение»). ..parserinvoker / lib64 / python3.7 / base64.py ", строка 87, в b64decode возвращает binascii.a2b_base64 (s) binascii.Error: Неверное заполнение

, а затем завершается. Я былпросматривал документацию и всю неделю пытался устранить неполадки и не нашел решения. Я включаю мои settings.py на случай, если это проблема конфигурации

Settings.py

BROKER_URL = "sqs://"
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_DEFAULT_QUEUE = env('CELERY_DEFAULT_QUEUE')
CELERY_RESULT_BACKEND = None 
BROKER_TRANSPORT_OPTIONS = {
    'region': 'us-east-1',
    'polling_interval':20,
    'visibility_timeout': 3600,
    'task_default_queue': env('CELERY_DEFAULT_QUEUE'),
}

1 Ответ

1 голос
/ 08 ноября 2019

Формат полезной нагрузки json, которую ожидает сельдерей в очереди, отличается от формата, который SQS получает от s3;для их правильной обработки может потребоваться отдельная периодическая задача, которая периодически проверяет их и истощает очередь уведомлений s3 вместо отправки уведомлений s3 в очередь брокера сельдерея. Тело сообщения s3 будет выглядеть как , описанное в документации Amazon здесь . Вот пример записи 2.1, отправляемой из S3 в SQS:

   "Records":[  
      {  
         "eventVersion":"2.1",
         "eventSource":"aws:s3",
         "awsRegion":"us-west-2",
         "eventTime":The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request,
         "eventName":"event-type",
         "userIdentity":{  
            "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
         },
         "requestParameters":{  
            "sourceIPAddress":"ip-address-where-request-came-from"
         },
         "responseElements":{  
            "x-amz-request-id":"Amazon S3 generated request ID",
            "x-amz-id-2":"Amazon S3 host that processed the request"
         },
         "s3":{  
            "s3SchemaVersion":"1.0",
            "configurationId":"ID found in the bucket notification configuration",
            "bucket":{  
               "name":"bucket-name",
               "ownerIdentity":{  
                  "principalId":"Amazon-customer-ID-of-the-bucket-owner"
               },
               "arn":"bucket-ARN"
            },
            "object":{  
               "key":"object-key",
               "size":object-size,
               "eTag":"object eTag",
               "versionId":"object version if bucket is versioning-enabled, otherwise null",
               "sequencer": "a string representation of a hexadecimal value used to determine event sequence, 
                   only used with PUTs and DELETEs"
            }
         },
         "glacierEventData": {
            "restoreEventData": {
               "lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
               "lifecycleRestoreStorageClass": "Source storage class for restore"
            }
         }
      }
   ]
}

Формат сообщения сельдерея выглядит следующим образом .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...