Как я могу прочитать определенное поле сообщения json, используя Spark Streaming? - PullRequest
0 голосов
/ 11 октября 2019

моя цель - извлечь из полезной нагрузки json, полученной от брокера kafka, определенное поле со своим значением (например, key: value). Мне нужно разработать этот скрипт с использованием Python. В настоящее время я читаю сообщение от брокера, но не могу разобрать json.

Вот несколько строк кода, которые я написал.

sc=SparkContext("local[*]","DEMO-APP")
ssc=StreamingContext(sc,10)
sqlContext = SQLContext(sc)
sc.setLogLevel("ERROR")

topic="mqtt"
broker="localhost:9092"

dstream=KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list": broker},keyDecoder=lambda x: x.decode('ascii',errors='ignore'), valueDecoder=lambda x: x.decode('ascii',errors='ignore'))

raw_msg=dstream.map(lambda k: k[1]).pprint()

Вот сообщение, которое я получаю:

{"r":[{"k":"coordinator","c":"coordinator","d":"jzp://coo#0000000500000001.0000","t":1570803180041,"tz":"2019-10-11T16:13:00.041+02:00"},{"c":"nexthop","d":"jzp://coo#0000000500000001.0000","t":1570803180041,"u":"dB","tz":"2019-10-11T16:13:00.041+02:00","v":-43.5,"k":"rssi"},{"c":"nexthop","d":"jzp://coo#0000000500000001.0000","t":1570803180041,"u":"dB","tz":"2019-10-11T16:13:00.041+02:00","v":-120.0,"k":"rssi.ook"}],"ref":"jzp://edv#2005.0000","address":"jz://edv#2005-0000000500000001","cuid":"c61c325a-9c0c-4235-9cbe-9aa006b7972c","t":1570803180041,"tz":"2019-10-11T16:13:00.041+02:00","cat":"0694","ruid":[],"type":"Automation","uuid":"aeaa5632-fbb6-4a82-b340-1f60374cad7c","m":[{"k":"battery_level","t":1570803180041,"u":"V","tz":"2019-10-11T16:13:00.041+02:00","v":3.25},{"k":"device_temperature","t":1570803180041,"u":"","tz":"2019-10-11T16:13:00.041+02:00","v":38.0},{"k":"channel_00","t":1570803180041,"u":"","tz":"2019-10-11T16:13:00.041+02:00","v":0.0},{"k":"channel_01","t":1570803180041,"u":"","tz":"2019-10-11T16:13:00.041+02:00","v":0.0}]}

Из вышеприведенного сообщения я хотел бы извлечь все значения "k" и значения "v" в разделе "m "(например, k: battery_level, v: 3.25 ...)

Я надеюсь, что кто-то может мне помочь. Спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...