Я основываю эту задачу на документации, предоставленной здесь Google: https://cloud.google.com/appengine/docs/flexible/python/writing-and-responding-to-pub-sub-messages
Я закончил тем, что пришел с этим кодом для моего основного, который извлекает глобально хранимую переменную для сообщений и вместо этого записывает каждое из них.в плоский файл.
# [START push]
@app.route('/pubsub/push', methods=['POST'])
def pubsub_push():
if (request.args.get('token', '') != current_app.config['PUBSUB_VERIFICATION_TOKEN']):
return 'Invalid request', 400
# Decode the data
envelope = json.loads(request.data.decode('utf-8'))
# Current time in UTC
current_date = datetime.utcnow()
payload = json.loads(base64.b64decode(envelope['message']['data']))
# Normalize and flatten the data
if "events" in payload and payload['events']:
payload['events'] = payload['events'][0]['data']
payload = parse_dict(init=payload, sep='_')
# Now jsonify all remaining lists and dicts
for key in payload.keys():
value = payload[key]
if isinstance(value, list) or isinstance(value, dict):
if value:
value = json.dumps(value)
else:
value = None
payload[key] = value
# Custom id with the message id and date string
id = "{}.{}".format(
payload['timestamp_unixtime_ms'],
payload['message_id']
)
filename_hourly = 'landing_path/{date}/{hour}/{id}.json'.format(
date=current_date.strftime("%Y%m%d"),
hour=current_date.strftime("%H"),
id=id
)
blob = bucket.get_blob(filename_hourly)
if blob: # We already have this file, skip this message
print('Already have {} stored.'.format(filename_hourly))
return 'OK', 200
blob_hourly = Blob(bucket=bucket, name=filename_hourly)
blob_hourly.upload_from_string(json.dumps(payload, indent=2, sort_keys=True))
# Returning any 2xx status indicates successful receipt of the message.
return 'OK', 200
# [END push]
Это прекрасно работает, но я получаю тонну ошибок 502 и 504.Он отображается здесь, на моей инструментальной панели стека-драйверов.
Я предполагаю, что загрузка файлов занимает слишком много времени, но я не уверен, что делатьпоступай иначе.Какие-либо предложения?Ресурсы на блоках appengine довольно низки, и мои ограничения API даже не близки.
Предложения кому-нибудь?