моя цель - извлечь из полезной нагрузки json, полученной от брокера kafka, определенное поле со своим значением (например, key: value). Мне нужно разработать этот скрипт с использованием Python. В настоящее время я читаю сообщение от брокера, но не могу разобрать json.
Вот несколько строк кода, которые я написал.
sc=SparkContext("local[*]","DEMO-APP")
ssc=StreamingContext(sc,10)
sqlContext = SQLContext(sc)
sc.setLogLevel("ERROR")
topic="mqtt"
broker="localhost:9092"
dstream=KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list": broker},keyDecoder=lambda x: x.decode('ascii',errors='ignore'), valueDecoder=lambda x: x.decode('ascii',errors='ignore'))
raw_msg=dstream.map(lambda k: k[1]).pprint()
Вот сообщение, которое я получаю:
{"r":[{"k":"coordinator","c":"coordinator","d":"jzp://coo#0000000500000001.0000","t":1570803180041,"tz":"2019-10-11T16:13:00.041+02:00"},{"c":"nexthop","d":"jzp://coo#0000000500000001.0000","t":1570803180041,"u":"dB","tz":"2019-10-11T16:13:00.041+02:00","v":-43.5,"k":"rssi"},{"c":"nexthop","d":"jzp://coo#0000000500000001.0000","t":1570803180041,"u":"dB","tz":"2019-10-11T16:13:00.041+02:00","v":-120.0,"k":"rssi.ook"}],"ref":"jzp://edv#2005.0000","address":"jz://edv#2005-0000000500000001","cuid":"c61c325a-9c0c-4235-9cbe-9aa006b7972c","t":1570803180041,"tz":"2019-10-11T16:13:00.041+02:00","cat":"0694","ruid":[],"type":"Automation","uuid":"aeaa5632-fbb6-4a82-b340-1f60374cad7c","m":[{"k":"battery_level","t":1570803180041,"u":"V","tz":"2019-10-11T16:13:00.041+02:00","v":3.25},{"k":"device_temperature","t":1570803180041,"u":"","tz":"2019-10-11T16:13:00.041+02:00","v":38.0},{"k":"channel_00","t":1570803180041,"u":"","tz":"2019-10-11T16:13:00.041+02:00","v":0.0},{"k":"channel_01","t":1570803180041,"u":"","tz":"2019-10-11T16:13:00.041+02:00","v":0.0}]}
Из вышеприведенного сообщения я хотел бы извлечь все значения "k" и значения "v" в разделе "m "(например, k: battery_level, v: 3.25 ...)
Я надеюсь, что кто-то может мне помочь. Спасибо!