Как преобразовать и извлечь поля в коннекторе JDBC приемника Kafka - PullRequest
0 голосов
/ 10 мая 2018

Я использую сторонний инструмент CDC, который реплицирует данные из исходной базы данных в разделы Kafka.Пример строки показан ниже:

{  
   "data":{  
      "USER_ID":{  
         "string":"1"
      },
      "USER_CATEGORY":{  
         "string":"A"
      }
   },
   "beforeData":{  
      "Data":{  
         "USER_ID":{  
            "string":"1"
         },
         "USER_CATEGORY":{  
            "string":"B"
         }
      }
   },
   "headers":{  
      "operation":"UPDATE",
      "timestamp":"2018-05-03T13:53:43.000"
   }
}

Какая конфигурация необходима в файле приемника для извлечения всех (под) полей в data и headers и игнорирования в полях beforeData, поэтомучто таблица назначения, в которой данные будут передаваться Kafka Sink, будет содержать следующие поля:

USER_ID, USER_CATEGORY, operation, timestamp

Я прошел список преобразования в документах слияния , но не смогнайдите, как использовать их, чтобы достигнуть вышеупомянутой цели.

Ответы [ 2 ]

0 голосов
/ 14 июня 2019

Если вы хотите перечислить конкретные имена полей, вы можете решить эту проблему следующим образом:

  1. Использование преобразования Flatten для свертывания вложения (которое преобразует пути исходной структуры в имена, разделенные точками)
  2. Использование преобразования Заменить с rename, чтобы сделать имена полей такими, какими вы хотите, чтобы сток испускал
  3. Использование другого Замените преобразование на whitelist, чтобы ограничить количество полей, выбранных вами

В вашем случае это может выглядеть так:

  "transforms": "t1,t2,t3",
  "transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
  "transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",
0 голосов
/ 11 мая 2018

Я думаю, вы хотите ExtractField, и, к сожалению, это операция Map.get, поэтому 1) вложенные поля не могут быть получены за один проход 2) для нескольких полей требуется несколько преобразований.

При этом вы можете попробовать это (не проверено)

transforms=ExtractData,ExtractHeaders
transforms.ExtractData.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractData.field=data
transforms.ExtractHeaders.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractHeaders.field=headers

Если это не сработает, вам лучше реализовать собственный пакет Transformations, который может по крайней мере отбрасывать значенияиз структуры / карты.

...