У меня есть рабочий процесс Nifi, который использует JoltTranformRecord для выполнения каких-либо манипуляций с данными, основанными на записях. Я должен создать значение по умолчанию uuid в каждом сообщении в файле потока. Моя конфигурация JoltTranformRecord следующая:
Спецификация толчка:
[{
"operation": "shift",
"spec": {
"payload": "data.payload"
}
}, {
"operation": "default",
"spec": {
"header": {
"source": "${source}",
"client_id": "${client_id}",
"uuid": "${UUID()}",
"payload_type":"${payload_type}"
}
}
}]
Операция сдвига и все остальные операции по умолчанию работают нормально как и ожидалось. Но UUID идет одинаково для всех сообщений. Мне нужны разные UUID для каждого сообщения. Я не хочу добавлять другой процессор только для этой цели.
Мой рабочий процесс ниже:
Конфигурации Reader & Writer для Процессор JoltRecord:
IngestionSchemaJsonTreeReader (из процессора JsonTreeReader):
IngestionSchemaAvroRecordSetWriter (из процессора AvroWriter) 10 * * 32 * *
В сконфигурированном реестре схем есть определенные ниже схемы.
com.xyz.ingestion.pre_json
{
"type": "record",
"name": "event",
"namespace": "com.xyz.ingestion.raw",
"doc": "Event ingested to kafka",
"fields": [
{
"name": "payload",
"type": [
"null",
"string"
],
"default": "null"
}
]
}
com.xyz.ingestion.raw -
{
"type": "record",
"name": "event",
"namespace": "com.xyz.ingestion.raw",
"doc": "Event ingested to kafka",
"fields": [
{
"type": {
"name": "header",
"type": "record",
"namespace": "com.xyz.ingestion.raw.header",
"doc": "Header data for event ingested",
"fields": [
{
"name": "payload_type",
"type": "string"
},
{
"name": "uuid",
"type": "string",
"size": "36"
},
{
"name": "client_id",
"type": "string"
},
{
"name": "source",
"type": "string"
}
]
},
"name": "header"
},
{
"type": {
"name": "data",
"type": "record",
"namespace": "com.xyz.ingestion.raw.data",
"doc": "Payload for event ingested",
"fields": [
{
"name": "payload",
"type": [
"null",
"string"
],
"default": "null"
}
]
},
"name": "data"
}
]
}