Интеграция Apache NiFi & Kafka - PullRequest
       17

Интеграция Apache NiFi & Kafka

0 голосов
/ 18 апреля 2019

Я не уверен, что этот вопрос где-то уже адресован, но я не смог найти полезного ответа в Интернете.

Я пытаюсь интегрировать Apache NiFi с Kafka - используя данные из Kafka, используя Apache NiFi. Ниже несколько вопросов, которые приходят мне в голову, прежде чем продолжить с этим.

В-1) У нас есть сценарий использования - чтение данных из Кафки в реальном времени, анализ данных, базовая проверка данных и последующая передача данных в HBase. я знаю Apache NiFi является подходящим кандидатом для выполнения такого рода обработки, но насколько легко построить рабочий процесс, если JSON, который мы обрабатываем, является сложным? Мы были изначально думал сделать то же самое с помощью Java Code, но позже понял, что это можно сделать с минимальными усилиями в NiFi. Обратите внимание, что 80% данных, которые мы обрабатываем Кафка будет простым JSON, но 20% будет сложным (включает массивы)

В-2) Самая хитрая часть при написании Kafka-потребителя - правильно обрабатывать смещение. Как Apache NiFi будет обрабатывать смещения при использовании тем Kafka? Как смещается будет правильно зафиксирован в случае, если во время обработки будет произведена перебалансировка? Фреймворки, такие как Spring-Kafka, предоставляют опции для фиксации смещений (до некоторой степени) в случае ребаланс срабатывает в середине обработки. Как NiFi справляется с этим?

1 Ответ

3 голосов
/ 18 апреля 2019

Я развернул несколько конвейеров в 3-узловом кластере NiFi в производстве, из которых один похож на ваш вариант использования.

Q-1) Очень просто и легко построить конвейер для вашегопотребительная случай.Поскольку вы не упомянули типы задач, связанных с processing json, я предполагаю общие задачи.Общей задачей, включающей JSON, может быть проверка схемы, которая может быть выполнена с использованием ValidateRecord Processor, преобразование с использованием JoltTransformRecord Processor, извлечение значений атрибутов с использованием EvaluateJsonPath, преобразование json в какой-либо другой формат, например, avro, с использованием ConvertJSONToAvro процессоров и т. Д.Nifi дает вам возможность независимо масштабировать каждую стадию / процессор в конвейерах.Например, если преобразование с использованием JoltTransformRecord отнимает много времени, его можно масштабировать для запуска N одновременных задач в каждом узле, настроив Concurrent Tasks на вкладке Scheduling.

Q-2) Насколько *Что касается процессора 1013 *, то управление смещением осуществляется сначала путем фиксации сеанса процессора NiFi, а затем смещения Kafka, что означает, что у нас есть хотя бы одна гарантия по умолчанию.Когда Kafka запускает перебалансировку потребителей для данного раздела, процессор быстро фиксирует (сеанс процессора и смещение Kafka) все, что получил, и возвращает потребителя в пул для повторного использования.

ConsumeKafka_2_0 обрабатывает смещение принятия, когда членыизменение группы потребителей или изменение подписки участников.Это может произойти, когда процессы умирают, добавляются новые экземпляры процессов или старые экземпляры возвращаются к жизни после сбоя.Также позаботились о случаях, когда количество разделов подписанной темы административно корректируется.

...