Чтение вывода Firehose в корзине S3, проверка входящих файлов JSON и переход к корзине успехов и ошибок в случае сбоя и сбоя - PullRequest
0 голосов
/ 18 сентября 2018

Я выхожу из Kinesis Firehose в ведре S3.Мне нужно прочитать входящие файлы JSON и проверить, является ли он действительным или нет, а затем переместить его в соответствии с sucess или bucket ошибки в aws.Проблема с данными Kinesis заключается в том, что каждая запись помещается в одну строку, и между двумя записями нет запятой.Я написал код, но мне нужна помощь в его точной настройке, так как я не могу разобрать все записи и как направить записи об ошибках.я ставлю выходные данные кинезиса и лямбда, которые я написал.любая помощь будет высоко оценена.Данные Kinesis Json:

{"clinicalStatus":"active","informationSource":{"reference":"string","id":"06f1685a-2f04-4c15-bb1c-0d4cece8e183"},"meta":{"lastUpdated":"2018-08-08T10:58:15.962Z","lastUpdatedBy":"45de787a-cf38-4ecd-9541-d7ba641ecb4e","versionId":1,"isDeleted":false,"createdBy":"45de787a-cf38-4ecd-9541-d7ba641ecb4e","created":"2018-08-08T10:58:15.962Z"},"_uniqueId":"d06f88e0-84d0-4d68-a8b1-57882398030b1234","onsetDateTime":"2017-01-11","asserter":{"reference":"Practitioner/1912007","display":"Forrest, Fhir"},"patient":{"reference":"Patient/4342012","display":"Smart, Timmy","id":"06f1685a-2f04-4c15-bb1c-0d4cece8e183"},"code":{"text":"Test Health Concern"},"dateRecorded":"2017-01-11","abatementBoolean":false,"resourceType":"Condition","category":{"coding":[{"code":"health-concern","system":"http://argonaut.hl7.org","display":"Health Concern"}],"text":"Health Concern"},"_userId":"06f1685a-2f04-4c15-bb1c-0d4cece8e1831234","verificationStatus":"confirmed","id":"06f1685a-2f04-4c15-bb1c-0d4cece8e183.d06f88e0-84d0-4d68-a8b1-57882398030b","tableName":"FhirCondition"},
{"clinicalStatus":"active","informationSource":{"reference":"string","id":"06f1685a-2f04-4c15-bb1c-0d4cece8e183"},"meta":{"lastUpdated":"2018-08-08T10:58:15.962Z","lastUpdatedBy":"45de787a-cf38-4ecd-9541-d7ba641ecb4e","versionId":1,"isDeleted":false,"createdBy":"45de787a-cf38-4ecd-9541-d7ba641ecb4e","created":"2018-08-08T10:58:15.962Z"},"_uniqueId":"d06f88e0-84d0-4d68-a8b1-57882398030b12345","onsetDateTime":"2017-01-11","asserter":{"reference":"Practitioner/1912007","display":"Forrest, Fhir"},"patient":{"reference":"Patient/4342012","display":"Smart, Timmy","id":"06f1685a-2f04-4c15-bb1c-0d4cece8e183"},"code":{"text":"Test Health Concern"},"dateRecorded":"2017-01-11","abatementBoolean":false,"resourceType":"Condition","category":{"coding":[{"code":"health-concern","system":"http://argonaut.hl7.org","display":"Health Concern"}],"text":"Health Concern"},"_userId":"06f1685a-2f04-4c15-bb1c-0d4cece8e18312345","verificationStatus":"confirmed","id":"06f1685a-2f04-4c15-bb1c-0d4cece8e183.d06f88e0-84d0-4d68-a8b1-57882398030b","tableName":"FhirCondition"}

Лямбда-код:

import boto3
import json
import sys
import jsonschema
from jsonschema import validate

s3_client = boto3.client('s3')
s3 = boto3.resource('s3')

def lambda_handler(event, context):
    webUrl = {"definitions":{},"$schema":"http://json-schema.org/draft-07/schema#","$id":"http://example.com/root.json","type":"object","title":"The Root Schema","required":["clinicalStatus"],"properties":{"clinicalStatus":{"$id":"#/properties/clinicalStatus","type":"string","title":"The Clinicalstatus Schema","default":"","examples":["active"],"pattern":"^(.*)$"}}}
    bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
    print("Bucket Name:",bucket_name)
    key = event["Records"][0]["s3"]["object"]["key"]
    print("Key Name:",key)
    target_key = 'FHIR/'+key[8:]
    print(target_key)
    obj = s3_client.get_object(Bucket=bucket_name, Key=key)
    obj_read = obj['Body'].read().decode('utf-8')
    json_Dict = '[' + obj_read.replace("\n", ",", obj_read.count("\n")-1) + ']'
    data = json.loads(json_Dict)


    for idx, item in enumerate(data):
        try:
            validate(data[idx],webUrl)
            sys.stdout.write("Record #{}: OK\n".format(idx))                            
            s3_client.put_object(Bucket = bucket_name, Key=target_key, Body=json.dumps(data["{}"])+ '\n' )            
        except jsonschema.exceptions.ValidationError as ve:
            sys.stderr.write("Record #{}: ERROR\n".format(idx))
            sys.stderr.write(str(ve) + "\n")
...