Читайте две строки файла одновременно в процессе потоковой передачи - PullRequest
1 голос
/ 04 ноября 2019

Я хочу обрабатывать файлы потоком flink, в котором две строки принадлежат друг другу. В первой строке есть заголовок, а во второй строке соответствующий текст.

Файлы находятся в моей локальной файловой системе. Я использую метод readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) с пользовательским FileInputFormat.

Мой класс задания потоковой передачи выглядит следующим образом:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Read> inputStream = env.readFile(new ReadInputFormatTest("path/to/monitored/folder"), "path/to/monitored/folder", FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
inputStream.print();
env.execute("Flink Streaming Java API Skeleton");

, а мой ReadInputFormatTest выглядит так:

public class ReadInputFormatTest extends FileInputFormat<Read> {

    private transient FileSystem fileSystem;
    private transient BufferedReader reader;
    private final String inputPath;
    private String headerLine;
    private String readLine;

    public ReadInputFormatTest(String inputPath) {
        this.inputPath = inputPath;
    }

    @Override
    public void open(FileInputSplit inputSplit) throws IOException {
        FileSystem fileSystem = getFileSystem();
        this.reader = new BufferedReader(new InputStreamReader(fileSystem.open(inputSplit.getPath())));
        this.headerLine = reader.readLine();
        this.readLine = reader.readLine();
    }

    private FileSystem getFileSystem() {
        if (fileSystem == null) {
            try {
                fileSystem = FileSystem.get(new URI(inputPath));
            } catch (URISyntaxException | IOException e) {
                throw new RuntimeException(e);
            }
        }
        return fileSystem;
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return headerLine == null;
    }

    @Override
    public Read nextRecord(Read r) throws IOException {
        r.setHeader(headerLine);
        r.setSequence(readLine);

        headerLine = reader.readLine();
        readLine = reader.readLine();

        return r;
    }
}

Как и ожидалось, заголовки и текст хранятся вместе в одном объекте. Тем не менее, файл читается восемь раз. Таким образом, проблема заключается в распараллеливании. Где и как я могу указать, что файл обрабатывается только один раз, а несколько файлов параллельно? Или я должен изменить свой пользовательский FileInputFormat еще дальше?

1 Ответ

0 голосов
/ 04 ноября 2019

Я бы изменил ваш источник так, чтобы он выдавал доступные имена файлов (вместо фактического содержимого файла), а затем добавил новый процессор, чтобы прочитать имя из входного потока, а затем испустить пары строк. Другими словами, разделите текущий источник на источник, за которым следует процессор. Процессор можно заставить работать с любой степенью параллелизма, и источником будет один экземпляр.

...