Сделайте групповые файлы в S3 вместо одного из каждого JSON - NIFI - PullRequest
1 голос
/ 03 ноября 2019

Я получаю результат вызова API, делаю некоторые преобразования и сохраняю его в S3, теперь он сохраняет 1 файл для каждого вызова API. В результате получается множество файлов:> PutS3Object

Формат json:

"data": {"value1": "test", "value2": "test2"},
"actions": [{"buy": 5, "sell": 6},{"buyAgain": 5, "sellAgain": 6}],
"Reactions": [{"buy": 5, "sell": 6}],
{"otherValue": "1",
"otherValue2": "2"}

иногда у действий есть значения внутри, в других caso "действия": [] Я отбрасываю Реакции, используя JoltTransformJSON с параметром удаления, у него есть LOTданных, которые мне не нужны

Чтобы объединить значения, я попробовал MergeContent, но он сбрасывает много записей, сначала я читаю возможные конфигурации, затем ... Я начинаю изменять параметры, чтобы посмотреть, как они меняютсявывод, он всегда удаляет много записей.

Так что теперь я сохраняю 1 файл на json в S3, это много файлов, и вы можете почувствовать это при запросе данных.

Как я могу улучшить поток, чтобы хранить меньше файлов? Спасибо!

---- РЕДАКТИРОВАТЬ: изображение добавлено ----

Текущая конфигурация MergeContents, не совсем понимаю Атрибут стратегии Свойство. Может ли это исправить изменения в схеме? (действия со значением или «действия»: [])

enter image description here

---- РЕДАКТИРОВАНИЕ 2 ---- Теперь я могу подтвердить, что группировкапо состоянию, как и ожидалось, но отбрасывая потоки JSON с «действиями»: [] они имеют то же состояние, что и некоторые потоки с заполненным этим полем, есть идеи? спасибо!

1 Ответ

1 голос
/ 03 ноября 2019

Процессор MergeContent является правильным решением. Установите merge format на Avro, и содержимое Avro потокового файла будет объединено в один потоковый файл. Ваша проблема с удаленными данными связана со свойством стратегии метаданных:

Для FlowFiles, формат ввода которых поддерживает метаданные (например, Avro), это свойство определяет, какие метаданные следует добавить в пакет. Если выбрано «Использовать первые метаданные», будут использоваться ключи / значения метаданных из первого FlowFile, который будет объединен. Если выбран параметр «Сохранить только общие метаданные», будут сохранены только метаданные, которые существуют во всех FlowFiles в пакете с одинаковым значением. Если выбрано «Игнорировать метаданные», метаданные не передаются в исходящий пакетный FlowFile. Если выбран вариант «Не объединять необычные метаданные», любой FlowFile, значения метаданных которого не совпадают со значениями первого связанного FlowFile, не будет объединен.

Flowfiles, схема которого не равна схемепервый связанный файл потока будет удален. Я могу придумать два возможных решения для предотвращения этого:

Использование Correlation Attribute Name для объединения потоковых файлов Avro, имеющих общую схему

Вы должны убедиться, что объединяются только файлы с одинаковымисхемы. Поэтому, если вы можете поместить некоторый атрибут в файл потока, такой как «type = CAR или type = BIKE», вы можете установить для атрибута корреляции значение «type». Затем MergeContent создаст пакеты на основе типа. Поскольку схема файлов в пакете одинакова, записи не будут удаляться.

Установить конкретную схему

Заменить InferAvroSchema и ConvertJsonToAvro одним процессором: ConvertRecord . Сконфигурируйте JsonTreeReader как читателя и оставьте свойства по умолчанию. Сконфигурируйте AvroRecordSetWriter как записывающее устройство и задайте следующие свойства:

enter image description here

В AvroRecordSetWriter выполните следующие настройки Schema text:

{
  "name": "MyClass",
  "type": "record",
  "namespace": "com.acme.avro",
  "fields": [
    {
      "name": "data",
      "type": {
        "name": "data",
        "type": "record",
        "fields": [
          {
            "name": "value1",
            "type": "string"
          },
          {
            "name": "value2",
            "type": "string"
          }
        ]
      }
    },
    {
      "name": "actions",
      "type": {
        "type": "array",
        "items": {
          "name": "actions_record",
          "type": "record",
          "fields": [
            {
              "name": "buyAgain",
              "type": ["int", "null"]
            },
            {
              "name": "sellAgain",
              "type": ["int", "null"]
            },
            {
              "name": "buy",
              "type": ["int", "null"]
            },
            {
              "name": "sell",
              "type": ["int", "null"]
            }
          ]
        }
      }
    },
    {
      "name": "Reactions",
      "type": {
        "type": "array",
        "items": {
          "name": "Reactions_record",
          "type": "record",
          "fields": [
            {
              "name": "buy",
              "type": "int"
            },
            {
              "name": "sell",
              "type": "int"
            }
          ]
        }
      }
    }
  ]
}

Обратите внимание, что в действия теперь включены все поля. Если вам нужна помощь для преобразования Json в схему Avro, используйте этот генератор схемы .

PS: если вам нужна дополнительная информация о том, как контролировать количество записей на объединение, нажмитездесь .

...