Я тестирую NiFi, чтобы заменить нашу текущую конфигурацию приема, которая импортирует данные из нескольких сегментов MySQL таблицы и сохраняет их в HDFS.
Я использую GenerateTableFetch
и ExecuteSQL
для достижения этой цели.
Каждый файл входящего потока будет иметь атрибут database.name
, который используется DBCPConnectionPoolLookup
для выбора соответствующего шарда.
Проблема в том, что, скажем, у меня есть 2 шарда для извлечения данных, shard_1
и shard_2
для учетных записей таблиц, а также у меня updated_at
как Maximum Value Columns
, состояние не сохраняется для table@updated_at
за осколок. В состоянии только одна запись в таблице.
Когда я проверяю в Data Provenance, я вижу, что файл потокового файла shard_2 удаляется без передачи в ExecuteSQL. И я думаю, это потому, что сначала выполняется запрос shard_1, а затем, когда приходит запрос shard_2, его записи проверяются по update_at shard_1, и, поскольку он возвращает пустой, он удаляет файл.
Кто-нибудь сталкивался с этой проблемой? Или я что-то упустил?