Мне нужно написать функцию с потоковыми данными из Kinesis, а затем отфильтровать столбец, значение которого больше 2.
Это моя функция:
from __future__ import print_function
import ast
import re
from aws_kinesis_agg.deaggregator import deaggregate_record
import base64
def encode_dict(d, codec='utf8'):
ks = d.keys()
for k in ks:
val = d.pop(k)
if isinstance(val, unicode):
val = val.encode(codec)
elif isinstance(val, dict):
val = encode_dict(val, codec)
if isinstance(k, unicode):
k = k.encode(codec)
d[k] = val
return d
def construct_response_record(record_id, data, is_parse_success):
return {
'recordId': record_id,
'result': 'Ok' if is_parse_success else 'ProcessingFailed',
'data': base64.b64encode(data)}
def process_kpl_record(kpl_record):
data = kpl_record['kinesis']['data']
raw_kpl_record_data = base64.b64decode(data.encode('utf-8'))
#print(raw_kpl_record_data)
try:
# Concatenate the data from de-aggregated records into a single output payload.
output_data = "".join(deaggregate_record(raw_kpl_record_data))
#return construct_response_record(kpl_record['recordId'], output_data, True)
except BaseException as e:
print('Processing failed with exception:' + str(e))
#return construct_response_record(kpl_record['recordId'], raw_kpl_record_data, False)
return output_data
def lambda_handler(event, context):
#`#'''A Python AWS Lambda function to process aggregated records sent to KinesisAnalytics.''
for raw_kpl_records in event['Records']:
raw_kpl_records = encode_dict(raw_kpl_records)
#print(raw_kpl_records)
output= process_kpl_record(raw_kpl_records)
print(output[1:3])
return {'records': output}
И мой вывод - это что-то например:
{"personline160":"0","Line160Wip":"0","personline170":"0","Line170Wip":"0","personothers":"1","frameTimeCode":821,"Line160Wt":"0","Line170Wt":"0","fragmentNumberString":"91343852333432021474203496186679955002613919290","timestamp":"2020-03-13 09:29:10.00029"}{"personline160":"0","Line160Wip":"0","personline170":"0","Line170Wip":"0","personothers":"1","frameTimeCode":1660,"Line160Wt":"0","Line170Wt
{
"personline160": "0",
"Line160Wip": "0",
"personline170": "0",
"Line170Wip": "0",
"personothers": "1",
"frameTimeCode": 821,
"Line160Wt": "0",
"Line170Wt": "0",
"fragmentNumberString": "91343852333432021474203496186679955002613919290",
"timestamp": "2020-03-13 09:29:10.00029"
}
{
"personline160": "0",
"Line160Wip": "0",
"personline170": "0",
"Line170Wip": "0",
"personothers": "1",
"frameTimeCode": 1660,
"Line160Wt": "0",
"Line170Wt": "0",
"fragmentNumberString": "91343852333432021474203496186679955002613919290",
"timestamp": "2020-03-13 09:29:11.00029"
}
{
"personline160": "0",
"Line160Wip": "0",
"personline170": "0",
"Line170Wip": "0",
"personothers": "1",
"frameTimeCode": 2487,
"Line160Wt": "0",
"Line170Wt": "0",
"fragmentNumberString": "91343852333432021474203496186679955002613919290",
"timestamp": "2020-03-13 09:29:12.00029"
}
{
"personline160": "0",
"Line160Wip": "0",
"personline170": "0",
"Line170Wip": "0",
"personothers": "1",
"frameTimeCode": 3326,
"Line160Wt": "0",
"Line170Wt": "0",
"fragmentNumberString": "91343852333432021474203496186679955002613919290",
"timestamp": "2020-03-13 09:29:13.00029"
}
Но тип вывода - строка, и мне нужно изменить его на соответствующий формат, как я могу это сделать? Я попробовал JSON.load
, eval()
функцию, но она не работала