Apache NiFi: JoltJSONTransform для обновления списка JSON ?, реальная проблема: избыток процессоров - PullRequest
0 голосов
/ 17 января 2020

Прежде всего, спасибо за вашу помощь.

Я работаю с Apache NiFi, преобразовывая список JSONS в одном FlowFile в несколько FlowFiles каждый с одним JSON.

Затем я использую JoltJSONTransform для обновления ключа, отсутствующего в json, присутствующего в атрибутах (имя файла)

Моя проблема в том, что мне потребовалось много времени, потому что я работа с большими файлами. Моя следующая задача - попытаться изменить ключ в каждом элементе, присутствующем в списке JSON, прежде чем я разделю его на несколько потоковых файлов.

Мои данные выглядят примерно так:

[
{ "number": "1",
  "pokemon":"Bulbasaur",
  "type":"plant"
},
{ "number": "4",
  "pokemon":"Charmander",
  "type":"fire"
},
{ "number": "7",
  "pokemon":"Squirtle",
  "type":"water"
}
]

И я пытаюсь добавить ключ: значение "filename": "pokemon.csv". Тот же ключ к каждому диктовку в списке ...

И это моя лучшая попытка, я думаю ... best try

Кто-нибудь знает, как я могу сделать это?

Прежде всего, я понятия не имею об использовании сценариев в Nifi: (

Отредактировано: Моя проблема заключается в избытке процессоров в NiFi. Я должен использовать python скрипт в NiFi, чтобы заменить 6 процессоров только одним, и теперь он работает достаточно хорошо. Спасибо за ваше время

1 Ответ

0 голосов
/ 20 января 2020

Split Json загрузит весь файл в память, создаст меньшие (split) потоковые файлы и зафиксирует их в отношении Splits. Метаданные для всех этих меньших потоковых файлов хранятся в пространстве динамической памяти, что может привести к снижению производительности и даже к ошибкам OOM (Out of Memory). Я полагаю, что есть две распространенные практики, чтобы исправить это.

Во-первых, при разбиении больших файлов вы можете использовать серию процессоров SplitText для разделения исходных данных, например, каждые 10.000 строк. Затем второй процессор SplitText разбивает эти куски на конечный размер. Это уменьшит объем занимаемой кучи памяти.

Во-вторых, вы можете использовать процессоры с поддержкой записи, такие как SplitRecord, со связанными с ними модулями записи / чтения. Они были разработаны, чтобы избежать проблемы множественного разделения.


Решение действительно зависит от вашего размера Json. Вот начало:

  • Использование SplitRecord для разбиения Json списка на куски (например, 1000)
  • Использование JoltTransform JSON для преобразования кусков
  • Используйте MergeContent для объединения кусков обратно (необязательно в зависимости от вашего варианта использования)

Общий поток:

enter image description here

GenerateFlowFile :

enter image description here

SplitRecord:

enter image description here

Создайте JsonTreeReader и JsonRecordSetWriter, оставьте настройки по умолчанию. Настройте размер разделения. Я установил его на два, потому что в моем списке три объекта, и я хочу видеть два фрагмента.

JoltTransform JSON:

enter image description here

Толчок c:

[
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      "*": {
        "filename": "filename.csv"
      }
    }
  },
  {
    "operation": "shift",
    "spec": {
      "*": {
        "*": "[&1].&"
      }
    }
  }
]

MergeRecord :

enter image description here

Создайте JsonTreeReader и JsonRecordSetWriter, оставьте настройки по умолчанию. Я установил Minimum Number of Records на три, потому что я хочу, чтобы все записи были объединены.

Результат :

enter image description here

...