Kafka Connect: как разобрать строку на карте - PullRequest
0 голосов
/ 13 апреля 2019

Предположим, у меня есть файл, заполненный JSON объектами / строками, разделенными символами новой строки (\n).Когда коннектор на основе FileStreamSource будет читать этот файл, он будет рассматривать каждую строку как java.lang.String.

Как можно проанализировать java.lang.String в java.util.Map или struct , чтобы выполнить дальнейшие преобразования (например, замаскировать поле с помощью MaskField или извлечь полеиспользуя ExtractField )?

PS: вопрос не в том, как разобрать некоторые java.lang.String в java.util.Map или struct , а в том, как интегрировать такую ​​логику синтаксического анализа с Kafka (пользовательское преобразование Кафки?)или получить тот же результат другими способами (например, настроить что-то в Kafka или использовать определенный соединитель / преобразование и т. д.)

Ответы [ 2 ]

0 голосов
/ 15 апреля 2019

Как сказано в документации Apache Kafka, FileStreamSource не совсем поддерживаемый производственный соединитель ...

Возможно, вам лучше использовать соединитель spooldir, который поддерживает JSON с разделителями строк.https://github.com/jcustenborder/kafka-connect-spooldir/blob/master/README.md

0 голосов
/ 14 апреля 2019

Существует два возможных способа:

  1. Вы можете использовать Confluent Platform и запустить свой коннектор с соответствующим запросом KSQL (https://docs.confluent.io/current/ksql/docs/tutorials/index.html#ksql-tutorials).
  2. Вы можете ускорить поток Kafkaapp (https://kafka.apache.org/documentation/streams/) вместе с вашим исходным соединителем. Потоковое приложение будет читать сообщения из темы / -ов, в которые ваш соединитель помещает сообщения. Вам необходимо реализовать логику преобразования в потоковом приложении Kafka. Когда сообщение являетсяобработанное потоковое приложение помещает его в тему вывода. Ниже приведен пример структуры кода потокового приложения.
Properties props = new Properties();

...

final StreamsBuilder builder = new StreamsBuilder();
Pattern pattern = Pattern.compile(<YOUR_INPUT_TOPIC_PATTERN>);
KStream<String, String> source = builder.stream(pattern);

...

source.mapValues((k,v) -> {
     Gson gson = new Gson();
     Map map = gson.fromJson(v, Map.class);

     // here is your transformation logi

     return v;
}).to(<YOUR_OUTPUT_TOPIC>);

...

final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);

...

streams.start();
...