Дамп серии обратно в InfluxDB после запроса с замененным значением поля - PullRequest
0 голосов
/ 05 сентября 2018

Сценарий

Я хочу отправить данные в MQTT Broker (Облако), запросив измерения из InfluxDB.

У меня есть поле в схеме, которое называется status. Это может быть 1 или 0. status=0 указывает, что серия не была отправлена ​​в облако. Если я получу подтверждение от брокера MQTT, я хочу переписать запрос обратно в базу данных с помощью status=1.

Как упоминалось в Часто задаваемые вопросы для InfluxDB в отношении дублирующихся данных Если информация имеет ту же временную метку, что и предыдущий запрос, но с другим значением поля =>, тогда будет отображено поле обновления.

Чтобы проверить это, я создал следующее:

CREATE DATABASE dummy
USE dummy
INSERT meas_1, type=t1, status=0,value=123 1536157064275338300

запрос:

SELECT * FROM meas_1

обеспечивает

time                status type value         
1536157064275338300 0      t1   234      

теперь, если я хочу перезаписать серии, я делаю следующее:

INSERT meas_1, type=t1, status=1,value=123 1536157064275338300                                                                       

, который перезапишет серию

 time                status type value         
 1536157064275338300 1      t1   234     

(Примечание: это невозможно через Теги в настоящее время в InfluxDB)

Использование

  1. Запросить некоторую информацию с помощью клиента с помощью "status"=0.
  2. Реструктурируйте JSON для отправки в облако
  3. Отправить информацию в облако
  4. В случае успеха запишите выходные данные из шага 1. обратно в БД, но с status=1.

Я использую InfluxDBClient Python3 для создания приложения (MQTT + InfluxDB)

В write_points API есть параметр, который упоминает batch_size, для которого требуется int в качестве ввода.

Я не уверен, как я могу использовать это с приложением, которое я хочу. Может ли кто-нибудь направить меня с этим или со Схемой БД, чтобы я мог загружать актуальную и не избыточную информацию в облако?

1 Ответ

0 голосов
/ 14 сентября 2018

batch_size - это фактически длина списка измерений, который необходимо передать в write_points.

Steps

  1. Создание клиента и запрос из измерения (здесь мы запрашиваем информацию GPS)

    client = InfluxDBClient(database='dummy')
    
    op = client.query('SELECT * FROM gps WHERE "status"=0', epoch='ns')
    
  2. Включите ResultSet в список:

     batch = list(op.get_points('gps'))
    
  3. создать пустой список для обновления

     updated_batch = []
    
  4. анализирует каждое измерение и устанавливает флаг status на 1. Обратите внимание, что значения по умолчанию в InfluxDB: float

       for each in batch:
    new_mes = {
    'measurement': 'gps',
    'tags': {
    'type': 'gps'
    },
    'time': each['time'],
    'fields': {
      'lat': float(each['lat']),
      'lon': float(each['lon']),
      'alt': float(each['alt']),
      'status': float(1)
    }
    }
    updated_batch.append(new_mes)
    
  5. Наконец сбросьте очки обратно через клиента с batch_size как длина updated_batch

    client.write_points(updated_batch, batch_size=len(updated_batch))
    

Это перезаписывает серию, потому что она содержит те же временные метки с полем status, установленным на 1

...