Я должен был сравнить различные способы. Я прокомментирую мои выводы, но, вкратце, самый быстрый способ - это использовать простой старый BufferedInputStream поверх FileInputStream. Если нужно прочитать много файлов, то три потока сократят общее время выполнения примерно до половины, но добавление большего количества потоков будет постепенно снижать производительность, пока не потребуется три раза больше времени для завершения с двадцатью потоками, чем с одним потоком.
Предполагается, что вы должны прочитать файл и сделать что-то осмысленное с его содержимым. В приведенных здесь примерах чтение строк из журнала и подсчет тех, которые содержат значения, превышающие определенный порог. Так что я предполагаю, что однострочная Java 8 Files.lines(Paths.get("/path/to/file.txt")).map(line -> line.split(";"))
не вариант.
Я тестировал на Java 1.8, Windows 7 и SSD и жестких дисках.
Я написал шесть разных реализаций:
rawParse : использовать BufferedInputStream поверх FileInputStream, а затем обрезать строки, читая побайтно. Это превзошло любой другой однопоточный подход, но может быть очень неудобно для файлов, не относящихся к ASCII.
lineReaderParse : использовать BufferedReader поверх FileReader, читать строку за строкой, разбивать строки, вызывая String.split (). Это примерно на 20% медленнее, чем rawParse.
lineReaderParseParallel : Это то же самое, что lineReaderParse, но в нем используются несколько потоков. Это самый быстрый вариант в целом во всех случаях.
nioFilesParse : использовать java.nio.files.Files.lines ()
nioAsyncParse : использование AsynchronousFileChannel с обработчиком завершения и пулом потоков.
nioMemoryMappedParse : использовать отображенный в памяти файл. Это действительно плохая идея, поскольку время выполнения по крайней мере в три раза больше, чем у любой другой реализации.
Это среднее время чтения 204 файлов по 4 МБ каждый на четырехъядерном диске i7 и SSD. Файлы создаются на лету, чтобы избежать кэширования диска.
rawParse 11.10 sec
lineReaderParse 13.86 sec
lineReaderParseParallel 6.00 sec
nioFilesParse 13.52 sec
nioAsyncParse 16.06 sec
nioMemoryMappedParse 37.68 sec
Я обнаружил разницу, меньшую, чем я ожидал, при работе на SSD или жестком диске, поскольку SSD работает примерно на 15% быстрее. Это может быть связано с тем, что файлы создаются на нефрагментированном жестком диске и считываются последовательно, поэтому вращающийся диск может работать почти как SSD.
Я был удивлен низкой производительностью реализации nioAsyncParse. Либо я реализовал что-то неправильно, либо многопоточная реализация с использованием NIO и обработчик завершения выполняет то же самое (или даже хуже), чем однопоточная реализация с API java.io. Более того, асинхронный анализ с CompletionHandler намного длиннее в строках кода и сложен для правильной реализации, чем прямая реализация на старых потоках.
Теперь за шестью реализациями следует класс, содержащий их все, плюс параметризуемый метод main (), который позволяет воспроизводить количество файлов, размер файла и степень параллелизма. Обратите внимание, что размер файлов варьируется плюс минус 20%. Это позволяет избежать какого-либо эффекта, поскольку все файлы имеют одинаковый размер.
rawParse
public void rawParse(final String targetDir, final int numberOfFiles) throws IOException, ParseException {
overrunCount = 0;
final int dl = (int) ';';
StringBuffer lineBuffer = new StringBuffer(1024);
for (int f=0; f<numberOfFiles; f++) {
File fl = new File(targetDir+filenamePreffix+String.valueOf(f)+".txt");
FileInputStream fin = new FileInputStream(fl);
BufferedInputStream bin = new BufferedInputStream(fin);
int character;
while((character=bin.read())!=-1) {
if (character==dl) {
// Here is where something is done with each line
doSomethingWithRawLine(lineBuffer.toString());
lineBuffer.setLength(0);
}
else {
lineBuffer.append((char) character);
}
}
bin.close();
fin.close();
}
}
public final void doSomethingWithRawLine(String line) throws ParseException {
// What to do for each line
int fieldNumber = 0;
final int len = line.length();
StringBuffer fieldBuffer = new StringBuffer(256);
for (int charPos=0; charPos<len; charPos++) {
char c = line.charAt(charPos);
if (c==DL0) {
String fieldValue = fieldBuffer.toString();
if (fieldValue.length()>0) {
switch (fieldNumber) {
case 0:
Date dt = fmt.parse(fieldValue);
fieldNumber++;
break;
case 1:
double d = Double.parseDouble(fieldValue);
fieldNumber++;
break;
case 2:
int t = Integer.parseInt(fieldValue);
fieldNumber++;
break;
case 3:
if (fieldValue.equals("overrun"))
overrunCount++;
break;
}
}
fieldBuffer.setLength(0);
}
else {
fieldBuffer.append(c);
}
}
}
lineReaderParse
public void lineReaderParse(final String targetDir, final int numberOfFiles) throws IOException, ParseException {
String line;
for (int f=0; f<numberOfFiles; f++) {
File fl = new File(targetDir+filenamePreffix+String.valueOf(f)+".txt");
FileReader frd = new FileReader(fl);
BufferedReader brd = new BufferedReader(frd);
while ((line=brd.readLine())!=null)
doSomethingWithLine(line);
brd.close();
frd.close();
}
}
public final void doSomethingWithLine(String line) throws ParseException {
// Example of what to do for each line
String[] fields = line.split(";");
Date dt = fmt.parse(fields[0]);
double d = Double.parseDouble(fields[1]);
int t = Integer.parseInt(fields[2]);
if (fields[3].equals("overrun"))
overrunCount++;
}
lineReaderParseParallel
public void lineReaderParseParallel(final String targetDir, final int numberOfFiles, final int degreeOfParalelism) throws IOException, ParseException, InterruptedException {
Thread[] pool = new Thread[degreeOfParalelism];
int batchSize = numberOfFiles / degreeOfParalelism;
for (int b=0; b<degreeOfParalelism; b++) {
pool[b] = new LineReaderParseThread(targetDir, b*batchSize, b*batchSize+b*batchSize);
pool[b].start();
}
for (int b=0; b<degreeOfParalelism; b++)
pool[b].join();
}
class LineReaderParseThread extends Thread {
private String targetDir;
private int fileFrom;
private int fileTo;
private DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private int overrunCounter = 0;
public LineReaderParseThread(String targetDir, int fileFrom, int fileTo) {
this.targetDir = targetDir;
this.fileFrom = fileFrom;
this.fileTo = fileTo;
}
private void doSomethingWithTheLine(String line) throws ParseException {
String[] fields = line.split(DL);
Date dt = fmt.parse(fields[0]);
double d = Double.parseDouble(fields[1]);
int t = Integer.parseInt(fields[2]);
if (fields[3].equals("overrun"))
overrunCounter++;
}
@Override
public void run() {
String line;
for (int f=fileFrom; f<fileTo; f++) {
File fl = new File(targetDir+filenamePreffix+String.valueOf(f)+".txt");
try {
FileReader frd = new FileReader(fl);
BufferedReader brd = new BufferedReader(frd);
while ((line=brd.readLine())!=null) {
doSomethingWithTheLine(line);
}
brd.close();
frd.close();
} catch (IOException | ParseException ioe) { }
}
}
}
nioFilesParse
public void nioFilesParse(final String targetDir, final int numberOfFiles) throws IOException, ParseException {
for (int f=0; f<numberOfFiles; f++) {
Path ph = Paths.get(targetDir+filenamePreffix+String.valueOf(f)+".txt");
Consumer<String> action = new LineConsumer();
Stream<String> lines = Files.lines(ph);
lines.forEach(action);
lines.close();
}
}
class LineConsumer implements Consumer<String> {
@Override
public void accept(String line) {
// What to do for each line
String[] fields = line.split(DL);
if (fields.length>1) {
try {
Date dt = fmt.parse(fields[0]);
}
catch (ParseException e) {
}
double d = Double.parseDouble(fields[1]);
int t = Integer.parseInt(fields[2]);
if (fields[3].equals("overrun"))
overrunCount++;
}
}
}
nioAsyncParse
public void nioAsyncParse(final String targetDir, final int numberOfFiles, final int numberOfThreads, final int bufferSize) throws IOException, ParseException, InterruptedException {
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(numberOfThreads);
ConcurrentLinkedQueue<ByteBuffer> byteBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
for (int b=0; b<numberOfThreads; b++)
byteBuffers.add(ByteBuffer.allocate(bufferSize));
for (int f=0; f<numberOfFiles; f++) {
consumerThreads.acquire();
String fileName = targetDir+filenamePreffix+String.valueOf(f)+".txt";
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(fileName), EnumSet.of(StandardOpenOption.READ), pool);
BufferConsumer consumer = new BufferConsumer(byteBuffers, fileName, bufferSize);
channel.read(consumer.buffer(), 0l, channel, consumer);
}
consumerThreads.acquire(numberOfThreads);
}
class BufferConsumer implements CompletionHandler<Integer, AsynchronousFileChannel> {
private ConcurrentLinkedQueue<ByteBuffer> buffers;
private ByteBuffer bytes;
private String file;
private StringBuffer chars;
private int limit;
private long position;
private DateFormat frmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public BufferConsumer(ConcurrentLinkedQueue<ByteBuffer> byteBuffers, String fileName, int bufferSize) {
buffers = byteBuffers;
bytes = buffers.poll();
if (bytes==null)
bytes = ByteBuffer.allocate(bufferSize);
file = fileName;
chars = new StringBuffer(bufferSize);
frmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
limit = bufferSize;
position = 0l;
}
public ByteBuffer buffer() {
return bytes;
}
@Override
public synchronized void completed(Integer result, AsynchronousFileChannel channel) {
if (result!=-1) {
bytes.flip();
final int len = bytes.limit();
int i = 0;
try {
for (i = 0; i < len; i++) {
byte by = bytes.get();
if (by=='\n') {
// ***
// The code used to process the line goes here
chars.setLength(0);
}
else {
chars.append((char) by);
}
}
}
catch (Exception x) {
System.out.println(
"Caught exception " + x.getClass().getName() + " " + x.getMessage() +
" i=" + String.valueOf(i) + ", limit=" + String.valueOf(len) +
", position="+String.valueOf(position));
}
if (len==limit) {
bytes.clear();
position += len;
channel.read(bytes, position, channel, this);
}
else {
try {
channel.close();
}
catch (IOException e) {
}
consumerThreads.release();
bytes.clear();
buffers.add(bytes);
}
}
else {
try {
channel.close();
}
catch (IOException e) {
}
consumerThreads.release();
bytes.clear();
buffers.add(bytes);
}
}
@Override
public void failed(Throwable e, AsynchronousFileChannel channel) {
}
};
ПОЛНАЯ ПЕРЕХОДНАЯ РЕАЛИЗАЦИЯ ВСЕХ СЛУЧАЙ
https://github.com/sergiomt/javaiobenchmark/blob/master/FileReadBenchmark.java