Обнаружение анамолии в режиме реального времени - PullRequest
0 голосов
/ 23 января 2019

Перенос данных из прежних версий в новые system.legay. Системные записи отправляются в Kafka в виде сообщения JSON.

Конвейер основного потока - 1 исходный Kafka, 2 преобразования с использованием плоской карты. Он преобразует данные на основе новой схемы системы.rules.3 дедупликация 4 записывает его в базу данных с помощью функции процесса.

Я определил другой конвейер заданий, снова считывающий из того же источника Kafka, т.е. другой потребитель считывает необработанные данные, подготавливает идентификатор карты - первичный ключ, а значение - JSON, поэтому необработанныйданные составляются в виде карты для каждой сущности.

Как сравнить исходную карту с преобразованными данными.Как мы передаем карту в основной поток для обнаружения анамолии?Должен ли я подготовить трубопровод parellel и не знаете, как это сделать?Пометить недопустимую запись на основе правил, в основном на ключевых полях, и перенести ее в раздел ошибок.Допустимые записи записываются в базу данных.

Цель заключается в проверке способа потоковой передачи записей, а не пакетной обработки.Flink 1.6.0

...