Я хочу обрабатывать файлы потоком flink, в котором две строки принадлежат друг другу. В первой строке есть заголовок, а во второй строке соответствующий текст.
Файлы находятся в моей локальной файловой системе. Я использую метод readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
с пользовательским FileInputFormat
.
Мой класс задания потоковой передачи выглядит следующим образом:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Read> inputStream = env.readFile(new ReadInputFormatTest("path/to/monitored/folder"), "path/to/monitored/folder", FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
inputStream.print();
env.execute("Flink Streaming Java API Skeleton");
, а мой ReadInputFormatTest
выглядит так:
public class ReadInputFormatTest extends FileInputFormat<Read> {
private transient FileSystem fileSystem;
private transient BufferedReader reader;
private final String inputPath;
private String headerLine;
private String readLine;
public ReadInputFormatTest(String inputPath) {
this.inputPath = inputPath;
}
@Override
public void open(FileInputSplit inputSplit) throws IOException {
FileSystem fileSystem = getFileSystem();
this.reader = new BufferedReader(new InputStreamReader(fileSystem.open(inputSplit.getPath())));
this.headerLine = reader.readLine();
this.readLine = reader.readLine();
}
private FileSystem getFileSystem() {
if (fileSystem == null) {
try {
fileSystem = FileSystem.get(new URI(inputPath));
} catch (URISyntaxException | IOException e) {
throw new RuntimeException(e);
}
}
return fileSystem;
}
@Override
public boolean reachedEnd() throws IOException {
return headerLine == null;
}
@Override
public Read nextRecord(Read r) throws IOException {
r.setHeader(headerLine);
r.setSequence(readLine);
headerLine = reader.readLine();
readLine = reader.readLine();
return r;
}
}
Как и ожидалось, заголовки и текст хранятся вместе в одном объекте. Тем не менее, файл читается восемь раз. Таким образом, проблема заключается в распараллеливании. Где и как я могу указать, что файл обрабатывается только один раз, а несколько файлов параллельно? Или я должен изменить свой пользовательский FileInputFormat
еще дальше?