Jolt Spec - Как вставить элемент в массив - PullRequest
0 голосов
/ 17 апреля 2019

Как вставить постоянный элемент, не требующий ввода json, в массив?

Мое намерение

Я перехватил MySQL CDC, который отформатирован в JSON, и добавил новый столбец, в котором указано время binlog.

А затем преобразовать JSON в AVRO, поэтому мне нужно автоматически генерировать avsc (CDC включает информацию о типе столбцов) в случае изменения таблицы MySQL.

Теперь мне не удалось вставить элемент в $.fields[] в avsc

.

Input

{
  "database": "test",
  "es": 1555381078000,
  "table": "table_name",
  "mysqlType": {
    "bool_type": "tinyint(1)",
    "tinyint_type": "tinyint(4)",
    "SMALLINT_type": "smallint(6)",
    "MEDIUMINT_type": "mediumint(9)",
    "int_type": "int(11)",
    "integer_type": "int(11)",
    "bigint_type": "bigint(20)",
    "float_type": "float",
    "double_type": "double",
    "decimal_type": "decimal(10,0)",
    "decimal_type2": "decimal(20,20)",
    "varchar_type": "varchar(20)",
    "date_type": "date",
    "time_type": "time",
    "datetime_type": "datetime",
    "timestamp_type": "timestamp"
  }
}

Текущая спецификация

[
  {
    "operation": "shift",
    "spec": {
      "database": "schema.namespace",
      "table": "schema.name",
      "#record": "schema.type",
      "#auto generated by jolt": "schema.doc",
      "mysqlType": {
        "*": {
          "tinyint*|smallint*|mediumint*|int*|date": {
            "$1": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#int": "schema.fields.[#3].type[]"
          },
          "bigint*|datetime|timestamp": {
            "$1": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#long": "schema.fields.[#3].type[]"
          },
          "float|double|decimal*": {
            "$1": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#long": "schema.fields.[#3].type[]"
          },
          "*": {
            "$1": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#string": "schema.fields.[#3].type[]"
          }
        },
        "#__binlog_time": "schema.fields[#2].name",
        "#null": "schema.fields[#2].type[]",
        "#long": "schema.fields[#2].type[]"
      }
    }
    }
]

токовый выход

Текущая спецификация толчка сделала это неправильно, поместив новый элемент в элемент $.fields[]

{
  "schema" : {
    "type" : "record",
    "doc" : "auto generated by jolt",
    "namespace" : "test",
    "name" : "table_name",
    "fields" : [ {
      "name" : "bool_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "tinyint_type",
      "type" : [ "null", "int" ]
    }, { // wrong there
      "name" : [ "__binlog_time", "SMALLINT_type" ],
      "type" : [ "null", "long", "null", "int" ]
    }, {
      "name" : "MEDIUMINT_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "int_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "integer_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "bigint_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "float_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "double_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "decimal_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "decimal_type2",
      "type" : [ "null", "long" ]
    }, {
      "name" : "varchar_type",
      "type" : [ "null", "string" ]
    }, {
      "name" : "date_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "time_type",
      "type" : [ "null", "string" ]
    }, {
      "name" : "datetime_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "timestamp_type",
      "type" : [ "null", "long" ]
    } ]
  }
}

Требуемый вывод

Вставляет элемент {"name":"new_column","type":["null","string"]} в массив $.fields[]

{
  "schema" : {
    "type" : "record",
    "doc" : "auto generated by jolt",
    "namespace" : "test",
    "name" : "table_name",
    "fields" : [ {
      "name" : "bool_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "tinyint_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "SMALLINT_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "MEDIUMINT_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "int_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "integer_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "bigint_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "float_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "double_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "decimal_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "decimal_type2",
      "type" : [ "null", "long" ]
    }, {
      "name" : "varchar_type",
      "type" : [ "null", "string" ]
    }, {
      "name" : "date_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "time_type",
      "type" : [ "null", "string" ]
    }, {
      "name" : "datetime_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "timestamp_type",
      "type" : [ "null", "long" ]
    }, { // new element(but no need be the last element)
      "name" : "__binlog_time",
      "type" : [ "null", "long" ]
    } ]
  }
}

1 Ответ

1 голос
/ 17 апреля 2019

Вы можете добавить новое поле к входному JSON со спецификацией default, а затем обновить свой "дополнительный длинный" сопоставитель, включив в него long:

[
  {
    "operation": "default",
    "spec": {
      "mysqlType": {
        "binlog_time": "long"
      }
    }
  },
  {
    "operation": "shift",
    "spec": {
      "database": "schema.namespace",
      "table": "schema.name",
      "#record": "schema.type",
      "#auto generated by jolt": "schema.doc",
      "mysqlType": {
        "*": {
          "tinyint*|smallint*|mediumint*|int*|date": {
            "$1": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#int": "schema.fields.[#3].type[]"
          },
          "bigint*|datetime|timestamp|long": {
            "$1": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#long": "schema.fields.[#3].type[]"
          },
          "float|double|decimal*": {
            "$1": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#long": "schema.fields.[#3].type[]"
          },
          "*": {
            "$1": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#string": "schema.fields.[#3].type[]"
          }
        }
      }
    }
  }
]
...