Процессор MergeContent является правильным решением. Установите merge format
на Avro
, и содержимое Avro потокового файла будет объединено в один потоковый файл. Ваша проблема с удаленными данными связана со свойством стратегии метаданных:
Для FlowFiles, формат ввода которых поддерживает метаданные (например, Avro), это свойство определяет, какие метаданные следует добавить в пакет. Если выбрано «Использовать первые метаданные», будут использоваться ключи / значения метаданных из первого FlowFile, который будет объединен. Если выбран параметр «Сохранить только общие метаданные», будут сохранены только метаданные, которые существуют во всех FlowFiles в пакете с одинаковым значением. Если выбрано «Игнорировать метаданные», метаданные не передаются в исходящий пакетный FlowFile. Если выбран вариант «Не объединять необычные метаданные», любой FlowFile, значения метаданных которого не совпадают со значениями первого связанного FlowFile, не будет объединен.
Flowfiles, схема которого не равна схемепервый связанный файл потока будет удален. Я могу придумать два возможных решения для предотвращения этого:
Использование Correlation Attribute Name
для объединения потоковых файлов Avro, имеющих общую схему
Вы должны убедиться, что объединяются только файлы с одинаковымисхемы. Поэтому, если вы можете поместить некоторый атрибут в файл потока, такой как «type = CAR или type = BIKE», вы можете установить для атрибута корреляции значение «type». Затем MergeContent создаст пакеты на основе типа. Поскольку схема файлов в пакете одинакова, записи не будут удаляться.
Установить конкретную схему
Заменить InferAvroSchema
и ConvertJsonToAvro
одним процессором: ConvertRecord . Сконфигурируйте JsonTreeReader
как читателя и оставьте свойства по умолчанию. Сконфигурируйте AvroRecordSetWriter
как записывающее устройство и задайте следующие свойства:
![enter image description here](https://i.stack.imgur.com/YVTXZ.png)
В AvroRecordSetWriter
выполните следующие настройки Schema text
:
{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "data",
"type": {
"name": "data",
"type": "record",
"fields": [
{
"name": "value1",
"type": "string"
},
{
"name": "value2",
"type": "string"
}
]
}
},
{
"name": "actions",
"type": {
"type": "array",
"items": {
"name": "actions_record",
"type": "record",
"fields": [
{
"name": "buyAgain",
"type": ["int", "null"]
},
{
"name": "sellAgain",
"type": ["int", "null"]
},
{
"name": "buy",
"type": ["int", "null"]
},
{
"name": "sell",
"type": ["int", "null"]
}
]
}
}
},
{
"name": "Reactions",
"type": {
"type": "array",
"items": {
"name": "Reactions_record",
"type": "record",
"fields": [
{
"name": "buy",
"type": "int"
},
{
"name": "sell",
"type": "int"
}
]
}
}
}
]
}
Обратите внимание, что в действия теперь включены все поля. Если вам нужна помощь для преобразования Json в схему Avro, используйте этот генератор схемы .
PS: если вам нужна дополнительная информация о том, как контролировать количество записей на объединение, нажмитездесь .