Перенос данных из прежних версий в новые system.legay. Системные записи отправляются в Kafka в виде сообщения JSON.
Конвейер основного потока - 1 исходный Kafka, 2 преобразования с использованием плоской карты. Он преобразует данные на основе новой схемы системы.rules.3 дедупликация 4 записывает его в базу данных с помощью функции процесса.
Я определил другой конвейер заданий, снова считывающий из того же источника Kafka, т.е. другой потребитель считывает необработанные данные, подготавливает идентификатор карты - первичный ключ, а значение - JSON, поэтому необработанныйданные составляются в виде карты для каждой сущности.
Как сравнить исходную карту с преобразованными данными.Как мы передаем карту в основной поток для обнаружения анамолии?Должен ли я подготовить трубопровод parellel и не знаете, как это сделать?Пометить недопустимую запись на основе правил, в основном на ключевых полях, и перенести ее в раздел ошибок.Допустимые записи записываются в базу данных.
Цель заключается в проверке способа потоковой передачи записей, а не пакетной обработки.Flink 1.6.0