Как преобразовать JSON полученные из очереди RabbitMQ в CSV? - PullRequest
0 голосов
/ 02 марта 2020

В настоящее время я использую сообщения из очереди RabbitMQ в моей организации. Каждый день мне нужно отправить sh все сообщения, полученные в CSV-файл, который в конечном итоге окажется в виде таблицы в хранилище данных.

Код всегда прослушивает очередь, и в идеале я хотел бы передать данные в CSV-файл.

#callback funtion on receiving messages
def onMessage(channel, method, properties, body):
    print(body)

while True:
    try:
        #connect
        credentials = pika.PlainCredentials(username, password)
        connection = pika.BlockingConnection(pika.ConnectionParameters(host = server, port = port, virtual_host = vhost, credentials = credentials))


channel = connection.channel()
channel.basic_consume(on_message_callback = onMessage, queue = queueName, auto_ack = True)
        channel.start_consuming()

Вывод, который я получаю после того, как начну использовать очередь, выглядит следующим образом: Это одна строка полученных данных. Он в основном возвращает объект json, однако b '{"metrics": необходимо удалить при использовании объекта json.

b' {"metrics": [{"ci_id" : "СПН-EQSHATA1", "client_id": "39956e6fdb256757567567433333193a", "имя": "deviceHealthScore", "source_id": "Global", "source_management_platform": "XXX", "метка": 1582886099642, "единица":» рассчитывать», "значение": "10,0"}, { "ci_id": "СПН-EQSHATA1", "client_id": "39956e6fdb256757567567433333193a", "название": "configAssuranceScore", "source_id": "Global", "source_management_platform" : "XXX", "метка времени": 1582886099325, "единица": "считать", "значение": "1.0"}, { "ci_id": "СПН-EQSHATA1", "client_id": "39956e6fdb256757567567433333193a", "название" : "imageAssuranceScore", "source_id": "Global", "source_management_platform": "XXX", "метки времени": 1582886099325 "единица": "Count", "значение": "1.0"}, { "ci_id":» SPN-EQSHATA1" , "client_id": "39956e6fdb256757567567433333193a", "имя": "vulnerabilityAssuranceScore", "source_id": "Global", "source_management_platform": "XXX", "метка": 1582886099325, "единица": "рассчитывать" ,"ценность":" 10,0 "}, {" ci_id ":" СПН-EQSHATA1" , "client_id": "39956e6fdb256757567567433333193a", "имя": "overallAssuranceScore", "source_id": "Global", "source_management_platform": "XXX", "метка" : 1582886099642, "unit": "count", "value": "5.5"}], "emr_published_on": 1582886099642} '

1 Ответ

1 голос
/ 02 марта 2020

b'...' просто означает, что вы получили байтовую строку, которую модуль json может успешно обработать. Вы бы получили словарь, который для ключа metrics имеет значение списка словарей. Список может напрямую заполнять DataFrame.

Это означает, что вы можете обработать это так же просто, как:

df = pd.DataFrame(json.loads(body)['metrics'])
...