Я выхожу из Kinesis Firehose в ведре S3.Мне нужно прочитать входящие файлы JSON и проверить, является ли он действительным или нет, а затем переместить его в соответствии с sucess или bucket ошибки в aws.Проблема с данными Kinesis заключается в том, что каждая запись помещается в одну строку, и между двумя записями нет запятой.Я написал код, но мне нужна помощь в его точной настройке, так как я не могу разобрать все записи и как направить записи об ошибках.я ставлю выходные данные кинезиса и лямбда, которые я написал.любая помощь будет высоко оценена.Данные Kinesis Json:
{"clinicalStatus":"active","informationSource":{"reference":"string","id":"06f1685a-2f04-4c15-bb1c-0d4cece8e183"},"meta":{"lastUpdated":"2018-08-08T10:58:15.962Z","lastUpdatedBy":"45de787a-cf38-4ecd-9541-d7ba641ecb4e","versionId":1,"isDeleted":false,"createdBy":"45de787a-cf38-4ecd-9541-d7ba641ecb4e","created":"2018-08-08T10:58:15.962Z"},"_uniqueId":"d06f88e0-84d0-4d68-a8b1-57882398030b1234","onsetDateTime":"2017-01-11","asserter":{"reference":"Practitioner/1912007","display":"Forrest, Fhir"},"patient":{"reference":"Patient/4342012","display":"Smart, Timmy","id":"06f1685a-2f04-4c15-bb1c-0d4cece8e183"},"code":{"text":"Test Health Concern"},"dateRecorded":"2017-01-11","abatementBoolean":false,"resourceType":"Condition","category":{"coding":[{"code":"health-concern","system":"http://argonaut.hl7.org","display":"Health Concern"}],"text":"Health Concern"},"_userId":"06f1685a-2f04-4c15-bb1c-0d4cece8e1831234","verificationStatus":"confirmed","id":"06f1685a-2f04-4c15-bb1c-0d4cece8e183.d06f88e0-84d0-4d68-a8b1-57882398030b","tableName":"FhirCondition"},
{"clinicalStatus":"active","informationSource":{"reference":"string","id":"06f1685a-2f04-4c15-bb1c-0d4cece8e183"},"meta":{"lastUpdated":"2018-08-08T10:58:15.962Z","lastUpdatedBy":"45de787a-cf38-4ecd-9541-d7ba641ecb4e","versionId":1,"isDeleted":false,"createdBy":"45de787a-cf38-4ecd-9541-d7ba641ecb4e","created":"2018-08-08T10:58:15.962Z"},"_uniqueId":"d06f88e0-84d0-4d68-a8b1-57882398030b12345","onsetDateTime":"2017-01-11","asserter":{"reference":"Practitioner/1912007","display":"Forrest, Fhir"},"patient":{"reference":"Patient/4342012","display":"Smart, Timmy","id":"06f1685a-2f04-4c15-bb1c-0d4cece8e183"},"code":{"text":"Test Health Concern"},"dateRecorded":"2017-01-11","abatementBoolean":false,"resourceType":"Condition","category":{"coding":[{"code":"health-concern","system":"http://argonaut.hl7.org","display":"Health Concern"}],"text":"Health Concern"},"_userId":"06f1685a-2f04-4c15-bb1c-0d4cece8e18312345","verificationStatus":"confirmed","id":"06f1685a-2f04-4c15-bb1c-0d4cece8e183.d06f88e0-84d0-4d68-a8b1-57882398030b","tableName":"FhirCondition"}
Лямбда-код:
import boto3
import json
import sys
import jsonschema
from jsonschema import validate
s3_client = boto3.client('s3')
s3 = boto3.resource('s3')
def lambda_handler(event, context):
webUrl = {"definitions":{},"$schema":"http://json-schema.org/draft-07/schema#","$id":"http://example.com/root.json","type":"object","title":"The Root Schema","required":["clinicalStatus"],"properties":{"clinicalStatus":{"$id":"#/properties/clinicalStatus","type":"string","title":"The Clinicalstatus Schema","default":"","examples":["active"],"pattern":"^(.*)$"}}}
bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
print("Bucket Name:",bucket_name)
key = event["Records"][0]["s3"]["object"]["key"]
print("Key Name:",key)
target_key = 'FHIR/'+key[8:]
print(target_key)
obj = s3_client.get_object(Bucket=bucket_name, Key=key)
obj_read = obj['Body'].read().decode('utf-8')
json_Dict = '[' + obj_read.replace("\n", ",", obj_read.count("\n")-1) + ']'
data = json.loads(json_Dict)
for idx, item in enumerate(data):
try:
validate(data[idx],webUrl)
sys.stdout.write("Record #{}: OK\n".format(idx))
s3_client.put_object(Bucket = bucket_name, Key=target_key, Body=json.dumps(data["{}"])+ '\n' )
except jsonschema.exceptions.ValidationError as ve:
sys.stderr.write("Record #{}: ERROR\n".format(idx))
sys.stderr.write(str(ve) + "\n")