Нифи JoltTransformRecord UUID в преобразовании по умолчанию не работает должным образом - PullRequest
1 голос
/ 07 января 2020

У меня есть рабочий процесс Nifi, который использует JoltTranformRecord для выполнения каких-либо манипуляций с данными, основанными на записях. Я должен создать значение по умолчанию uuid в каждом сообщении в файле потока. Моя конфигурация JoltTranformRecord следующая:

enter image description here

Спецификация толчка:

[{
    "operation": "shift",
    "spec": {
        "payload": "data.payload"
    }
}, {
    "operation": "default",
    "spec": {
        "header": {
            "source": "${source}",
            "client_id": "${client_id}",
            "uuid": "${UUID()}",
            "payload_type":"${payload_type}"
        }

    }
}]

Операция сдвига и все остальные операции по умолчанию работают нормально как и ожидалось. Но UUID идет одинаково для всех сообщений. Мне нужны разные UUID для каждого сообщения. Я не хочу добавлять другой процессор только для этой цели.

Мой рабочий процесс ниже:

enter image description here

Конфигурации Reader & Writer для Процессор JoltRecord:

IngestionSchemaJsonTreeReader (из процессора JsonTreeReader):

enter image description here

IngestionSchemaAvroRecordSetWriter (из процессора AvroWriter) 10 * * 32 * *enter image description here

В сконфигурированном реестре схем есть определенные ниже схемы.

com.xyz.ingestion.pre_json

{
  "type": "record",
  "name": "event",
  "namespace": "com.xyz.ingestion.raw",
  "doc": "Event ingested to kafka",
  "fields": [
          {
            "name": "payload",
            "type": [
              "null",
              "string"
            ],
            "default": "null"
          }
        ]
  }


com.xyz.ingestion.raw -

  {
  "type": "record",
  "name": "event",
  "namespace": "com.xyz.ingestion.raw",
  "doc": "Event ingested to kafka",
  "fields": [
    {
      "type": {
        "name": "header",
        "type": "record",
        "namespace": "com.xyz.ingestion.raw.header",
        "doc": "Header data for event ingested",
        "fields": [
          {
            "name": "payload_type",
            "type": "string"
          },
          {
            "name": "uuid",
            "type": "string",
            "size": "36"
          },
          {
            "name": "client_id",
            "type": "string"
          },
          {
            "name": "source",
            "type": "string"
          }
        ]
      },
      "name": "header"
    },
    {
      "type": {
        "name": "data",
        "type": "record",
        "namespace": "com.xyz.ingestion.raw.data",
        "doc": "Payload for event ingested",
        "fields": [
          {
            "name": "payload",
            "type": [
              "null",
              "string"
            ],
            "default": "null"
          }
        ]
      },
      "name": "data"
    }
  ]
}

1 Ответ

2 голосов
/ 07 января 2020

Язык выражения оценивается для каждой записи. UUID() выполняется для каждой оценки. Таким образом, uuid должно быть уникальным для каждой записи. Из предоставленной вами информации я не могу понять, почему вы получаете дубликат uuids.

Я попытался воспроизвести вашу проблему следующим образом:

enter image description here

GenerateFlowFile :

enter image description here

Разделить Json: настроить $ как JsonPathExpression для разделения Json массива на записи.

JoltTransformRecord :

enter image description here

Как вы можете видеть способ добавления UUID ничем не отличается от того, как вы это делаете. Но я получаю разные UUID, как и ожидалось:

enter image description here enter image description here

...