Я читаю CSV-файлы из каталога и выполняю некоторую обработку.
Прямо сейчас flink просто выбирает любой новый файл, который входит в этот каталог, и обрабатывает его. Это работает нормально для меня.
Я застрял в 2 выпусках:
- Я хочу записать имена файлов, которые Flink завершил обработку.
- Я хочу переместить обработанные файлы в другую папку, как только Flink завершит обработку.
Мой фрагмент кода:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path(feedFileFolderPath);
RowCsvInputFormat format = new RowCsvInputFormat(filePath, FetchTypeInformation.getTypeInformation());
DataStream<Row> inputStream = env.readFile(format, feedFileFolderPath, FileProcessingMode.PROCESS_CONTINUOUSLY,
parseInt(folderLookupTime));