Apache flink - переместить файлы в другую папку после прочтения - PullRequest
0 голосов
/ 27 августа 2018

Я читаю CSV-файлы из каталога и выполняю некоторую обработку. Прямо сейчас flink просто выбирает любой новый файл, который входит в этот каталог, и обрабатывает его. Это работает нормально для меня.

Я застрял в 2 выпусках:

  1. Я хочу записать имена файлов, которые Flink завершил обработку.
  2. Я хочу переместить обработанные файлы в другую папку, как только Flink завершит обработку.

Мой фрагмент кода:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path(feedFileFolderPath);

RowCsvInputFormat format = new RowCsvInputFormat(filePath, FetchTypeInformation.getTypeInformation());

DataStream<Row> inputStream = env.readFile(format, feedFileFolderPath, FileProcessingMode.PROCESS_CONTINUOUSLY,
                parseInt(folderLookupTime));

1 Ответ

0 голосов
/ 27 августа 2018

Эта тема пару раз поднималась в списке рассылки Flink - см. Обсуждение здесь и здесь - но краткий итог заключается в том, что еще нет простого способ сделать это изнутри Флинк.

То, что обычно делается, - это использовать задание cron для периодического перемещения старых файлов из отслеживаемого каталога, предполагая, что они были обработаны. Если вы хотите быть более осторожным, вам нужно будет внедрить свой собственный механизм для отслеживания хода выполнения работы по обработке. Упомянутые выше темы электронной почты содержат некоторые идеи о том, как это сделать.

...