Как получить и сохранить смещение входа kafka в измерении Influxdb - PullRequest
0 голосов
/ 07 ноября 2019

Моя главная цель - получить смещение, которое kafka использует для отправки каждой записи, и сохранить его с данными, которые отправляются в столбец.
Таким образом, я смогу использовать его как uuid, поскольку он каждый раз уникален.
Моя проблема в том, что я не знаю, как включить эти данные.
Моя telegraf конфигурация имеет вид:

- kafka_consumer:
        brokers:
          - "....."
        topics: 
          - "example_topic"
        consumer_group: "telegraf_consumers"
        offset: "newest"
        data_format: "csv"
        ......
        ......
        csv_column_names: 
          - "timestamp"
          - "_id"
          - "bid"
          - "ask"
          - "_type"
        csv_column_types: 
          - "string"
          - "int"
          - "float"
          - "float"
          - "int"
        csv_timestamp_column: "timestamp"
        csv_timestamp_format: "unix"
        csv_measurement_column: "_id"

В результате я хочу получить что-то вроде:

отметка времени _id bid ask _typeсмещение отметки времени
1573137284259187000 14 3,5 4,2 3 1573137284.259187 123456

Что мне нужно добавить, чтобы можно было включить смещение каждой записи, включенной в измерение? Есть ли способ сделать это?

...