Процессор Nifi MergeRecord для слияния нулевых значений - PullRequest
0 голосов
/ 11 мая 2018

Я делю список полей и пытаюсь объединить их в конце.У меня есть 2 вида полей, стандартное поле и настраиваемое поле.Способ обработки пользовательских полей отличается от стандартных полей.

{
 "standardfield1" : "fieldValue1",
  "customField1" : "customValue"
}

Они должны быть переведены в

{ 
  "standardfield1" : "fieldValue1",
  "customFields" : [
   { "type" : "customfield",
     "id" : 1212 //this is id of the customField1, retrieved at run time
     "value" :  "customValue"
   } ]
}

Моя схема mergeRecord установлена ​​на

{
  "name": "custom field",
  "namespace": "nifi",
  "type": "record",
  "fields": [
    { "name": "id", "type": "string" },
    { "name": "type", "type": "string" },
    { "name": "value", "type": "string" }

  ]
}

И в соответствии с моими потребностями я устанавливаю содержимое стандартного поля для нового атрибута потока, поскольку я могу извлечь его из него, и помещаю пустое значение в содержимое потока файла.

Итак, и пользовательские поля, истандартные поля подключены к процессору mergeRecord.

он работает довольно хорошо, если в полезной нагрузке доступны настраиваемые поля.Если есть только стандартные поля и нет настраиваемых полей, то процессор mergeRecord не будет сливать что-либо, а также не потерпит неудачу, он просто генерирует исключение NullPointerException, и там поток-файл застрял в очереди навсегда.

Я хочу заставить процессор mergeRecord сливатьсядаже пустые файлы потока контента.

Любая помощь будет оценена

1 Ответ

0 голосов
/ 11 мая 2018

Я не уверен, что полностью понимаю ваш вариант использования, но для вашего ввода выше, если вы извлекли / заполнили идентификатор для customField1 в атрибут (назовем его myId), тогда вы могли бы использовать JoltTransformJSON , чтобы получить желаемый результат выше, используя эту спецификацию цепи:

[
  {
    "operation": "shift",
    "spec": {
      "standardfield1": "standardfield1",
      "customField*": {
        "@": "customFields.[&(1,1)].value",
        "#customfield": "customFields.[&(1,1)].type",
        "#${myId}": "customFields.[&(1,1)].id"
      }
    }
  },
  {
    "operation": "remove",
    "spec": {
      "customFields": {
        "0": ""
      }
    }
  },
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      "customFields": {
        "*": {
          "id": "=toInteger"
        }
      }
    }
  }
]

Это создаст массив customFields, если присутствует customField, и заполнит его значениями, которые у вас есть выше (включая значение атрибута myId). Вы можете изменить настройки (например, добавить спецификацию по умолчанию в вышеупомянутую цепочку), чтобы добавить пустой массив для customFields, если хотите (чтобы схема была довольна, например). Если я неправильно понял, что вы пытаетесь сделать, пожалуйста, дайте мне знать, и я сделаю все возможное, чтобы помочь.

...