NiFi - CaptureChangeMySQL конвертирует json в формат ["col_name": "col_value"] - PullRequest
0 голосов
/ 15 февраля 2019

пролог

Имя таблицы MySQL: ar_tmp имеет два столбца id int и name int

для выполнения

Я выполняю sql

insert into ar_tmp (id, name) values (1, 4);

и CaptureChangeMySQL захватывает этот CDC и потоковый контент, подобный этому

{
    "type":"insert",
    "timestamp":1550221517000,
    "binlog_filename":"mysql-bin.013920",
    "binlog_position":241518646,
    "database":"platform_data",
    "table_name":"ar_tmp",
    "table_id":2899035,
    "columns":[
        {
            "id":1,
            "name":"id",
            "column_type":4,
            "value":1
        },
        {
            "id":2,
            "name":"name",
            "column_type":4,
            "value":4
        },
        {
            "id":3,
            "value":4
        }
    ]
}

Но я хочу получить результат в этом формате

{
    "type":"insert",
    "timestamp":1550221517000,
    "binlog_filename":"mysql-bin.013920",
    "binlog_position":241518646,
    "database":"platform_data",
    "table_name":"ar_tmp",
    "table_id":2899035,
    "columns":[
        {
            "id":1,
            "name":4
        }
    ]
}

или

{
    "id":1,
    "name":4
}

решение

Это может быть сделано путем жесткого кодирования с использованием jsonPath функция enter image description here

Но, может быть, «трудно» сделатьэто потому, что в каждом столбце используется один и тот же код, который создает избыточность процессора (скажем, 50 столбцов).Хуже того, это опасно, когда имя столбца изменилось.

Есть идеи?

1 Ответ

0 голосов
/ 15 февраля 2019

JoltTransformJSON может помочь вам здесь.

Попробуйте это здесь

Jolt Spec для вашего демо-входа json:

[
  {
    "operation": "shift",
    "spec": {
      "columns": {
        "*": {
          "value": "columns.@(1,name)"
        }
      },
      "*": "&"
    }
  }
]

Результат:

{
  "type" : "insert",
  "timestamp" : 1550221517000,
  "binlog_filename" : "mysql-bin.013920",
  "binlog_position" : 241518646,
  "database" : "platform_data",
  "table_name" : "ar_tmp",
  "table_id" : 2899035,
  "columns" : {
    "id" : 1,
    "name" : 4
  }
}
...