Чтение файла, который добавляется в Flink - PullRequest
0 голосов
/ 21 марта 2020

У нас есть устаревшее приложение, которое записывает результаты в виде записей в некоторые локальные файлы. Мы хотим обрабатывать эти записи в режиме реального времени, поэтому мы планируем использовать Flink в качестве движка. Я знаю, что могу читать текстовые файлы, используя StreamingExecutionEnvironment#readFile. Кажется, нам нужно что-то похожее на PROCESS_CONTINUOUSLY, но этот флаг вызывает повторную обработку всего файла при каждом изменении, а здесь это не то, что нам нужно.

Конечно, я могу написать свой собственный источник, который сохраняет количество записей на файл в своем состоянии. Но я полагаю, что может быть какая-то проблема с таким подходом с контрольными точками или что-то в этом роде - я считаю, что если бы это было легко реализовать надежно, это было бы уже реализовано в Flink.

Любые советы / предложения, как подойти к этому?

1 Ответ

0 голосов
/ 21 марта 2020

Вы можете сделать это довольно легко с пользовательским источником, если вы читаете контент из одного файла (для экземпляра источника). Вам нужно будет использовать состояние оператора и реализовать контрольные точки. Обработка состояния и контрольная точка будут выглядеть примерно так:

public class CheckpointedFileSource implements SourceFunction<Event>, ListCheckpointed<Long> {
    private long eventCnt = 0;

    public void run(SourceContext<Event> sourceContext) throws Exception {
        final Object lock = sourceContext.getCheckpointLock();

        // skip over previously emitted events
        ...

        while (not cancelled) {
            read event from file;

            synchronized (lock) {
                eventCnt++;
                sourceContext.collectWithTimestamp(event, timestamp);
            }

        }
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        return Collections.singletonList(eventCnt);
    }

    @Override
    public void restoreState(List<Long> state) throws Exception {
        for (Long s : state)
            this.eventCnt = s;
    }

}

Полный пример см. В контрольном источнике данных поездки на такси , используемом в тренировочных упражнениях Flink. Вам придется немного его адаптировать, поскольку он предназначен для чтения файла stati c, а не того, к которому добавляется.

...