Привет всем, я получаю поток сообщений от темы Кафки в NiFi, которую я читаю через потребительский процесс.Сообщение имеет формат json (фиктивные значения json, формат json такой же, как и у оригинала):
{ "schema": {
"type": "struct",
"name": "emp_table",
"fields": [
{
"field": "emp_id",
"type": "string"
},
{
"field": "emp_name",
"type": "String"
},
{
"field": "city",
"type": "string"
},
{
"field": "emp_sal",
"type": "string"
},
{
"field": "manager_name",
"type": "string"
}
] }, "payload": {
"emp_id": "1",
"emp_name": "abc",
"city": "NYK",
"emp_sal": "100000",
"manager_name": "xyz" } }
Как вы можете видеть, фактическое имя таблицы находится в схеме, а значения столбцов - в полезной нагрузке.Я могу анализировать значения столбцов и помещать их в таблицу Hbase с помощью процессоров EvaluateJsonPath и PutHBaseJson в NiFi.
То, что я могу достичь, - это вручную ввести имя таблицы и идентификатор строки.Но моя проблема в том, что я хочу получить имя таблицы (в приведенном выше примере emp_table) и rowid (в приведенном выше примере emp_id) из json и во время выполнения предоставить эти значения процессору PutHbaseJson в NiFi вместе со значениями столбцов.