Парсинг входящего сообщения Json, поступающего из потока NiFi, в таблицу Hbase - PullRequest
0 голосов
/ 30 декабря 2018

Привет всем, я получаю поток сообщений от темы Кафки в NiFi, которую я читаю через потребительский процесс.Сообщение имеет формат json (фиктивные значения json, формат json такой же, как и у оригинала):

{   "schema": {
    "type": "struct",
    "name": "emp_table",
    "fields": [
      {
        "field": "emp_id",
        "type": "string"
      },
      {
        "field": "emp_name",
        "type": "String"
      },
      {
        "field": "city",
        "type": "string"
      },
      {
        "field": "emp_sal",
        "type": "string"
      },
      {
        "field": "manager_name",
        "type": "string"
      }
    ]   },   "payload": {
    "emp_id": "1",
    "emp_name": "abc",
    "city": "NYK",
    "emp_sal": "100000",
    "manager_name": "xyz"   } }

Как вы можете видеть, фактическое имя таблицы находится в схеме, а значения столбцов - в полезной нагрузке.Я могу анализировать значения столбцов и помещать их в таблицу Hbase с помощью процессоров EvaluateJsonPath и PutHBaseJson в NiFi.

То, что я могу достичь, - это вручную ввести имя таблицы и идентификатор строки.Но моя проблема в том, что я хочу получить имя таблицы (в приведенном выше примере emp_table) и rowid (в приведенном выше примере emp_id) из json и во время выполнения предоставить эти значения процессору PutHbaseJson в NiFi вместе со значениями столбцов.

enter image description here

1 Ответ

0 голосов
/ 02 января 2019

У вас должно получиться добавить еще одно выражение пути JSON в EvaluateJsonPath с помощью чего-то вроде:

table = $.schema.name

Затем в PutHBaseJson создайте имя таблицы $ {table} или любое другое имя, которое вы назвали в EvaluateJsonPath.

...