Могу ли я сделать FileInputStream.read блоком, пока FileOutputStream для того же файла не будет закрыт? - PullRequest
0 голосов
/ 10 января 2020

В моем приложении я получаю данные, которые хочу сохранить в файле, а также делаю некоторые вычисления с ним. Как получение, так и расчет могут длиться долго, поэтому я хочу сделать это асинхронно. В приведенном ниже списке показаны мои основные настройки c: thread1 производит некоторые данные и сохраняет их в файле. thread2 читает файл и обрабатывает данные.

    Thread thread1 = new Thread( () -> {
      try {
        BufferedOutputStream out = new BufferedOutputStream( new FileOutputStream( "test" ) );
        for( int i = 0; i < 10; i++ ) {
          //producing data...
          out.write( ( "hello " + i + "\n" ).getBytes() );
          out.flush();
          //Thread.sleep( 10 );
        }
        out.close();
      } catch( Exception e ) {
        e.printStackTrace();
      }
    } );
    thread1.start();

    Thread thread2 = new Thread( () -> {
      try {
        BufferedInputStream in = new BufferedInputStream( new FileInputStream( "test" ) );
        int b = in.read();
        while( b != -1 ) {
          //do some calculation with data
          System.out.print( (char)b );
          b = in.read();
        }
        in.close();
      } catch( Exception e ) {
        e.printStackTrace();
      }
    } );
    thread2.start();

Я полагаю, что чтение и запись одновременно в один и тот же файл в порядке в соответствии с этим вопросом: FileInputStream и FileOutputStream в один и тот же файл: read () гарантированно видит все write (), которые "случались раньше"? или я что-то здесь упустил?

Выполнение перечисленного выше выдает результат, как и ожидалось:

hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9

Но если поток чтения по какой-то причине быстрее, чем запись (можно смоделировать, раскомментировав строку Thread.sleep в thread1), программа чтения читает EOF (-1) и завершает работу до того, как файл полностью написано. Выводится только одна строка:

hello 0

Однако автор все еще выводит весь вывод в «тестовом» файле.

Теперь я хотел бы сделать блок in.read() до тех пор, пока FileOutputStream в Тема1 закрыта. Я думал, что это можно сделать, избегая того, чтобы EOF помещался в конец файла, пока out не будет закрыт. Это правда, и если да, то как я могу это сделать? Или есть лучшие подходы?

1 Ответ

1 голос
/ 10 января 2020

Читатель (потребитель) может ожидать записи (производителя), даже если интерфейс является файлом. Но в целом вам было бы гораздо лучше использовать очередь и следовать шаблону «производитель / потребитель».

В любом случае, в этом случае грубая процедура «ожидания большего количества ввода» включает в себя только два значения Atomic:

  • один для отслеживания количества записанных байтов (AtomicInteger)
  • один для указания того, что больше нет доступных байтов (AtomicBoolean)
Переменные

Atomi c могут быть разделены между потоками: оба потока всегда будут видеть последнее значение значения atomi c. Затем пишущий может обновить число байтов, записанных с помощью AtomicInteger, и читатель может решить дождаться большего ввода. Автор также может указать, не будет ли больше байтов записываться через AtomicBoolean, и читатель может использовать эту информацию для чтения до конца файла.

Еще одна вещь, о которой следует помнить, это запуск потока не находится под вашим контролем: ваша операционная система определит, когда потоки действительно начнут работать. Чтобы дать потокам разумную возможность работать в одно и то же время, используйте «startLatch», как показано в приведенном ниже коде.

Приведенный ниже демонстрационный код работает и должен дать хорошее представление о том, как можно сделать Поток читателя ожидает дополнительной информации от писателя.


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class ReadWhileWrite {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            CountDownLatch startLatch = new CountDownLatch(2);
            Path testFile = Paths.get("test-read-while-write.txt");
            testFile.toFile().delete();
            int fakeSlowWriteMs = 100; // waiting time in milliseconds between writes. 

            CountDownLatch testFileExists = new CountDownLatch(1);
            AtomicInteger bytesWritten = new AtomicInteger();
            AtomicBoolean writeFinished = new AtomicBoolean();

            // Writer
            executor.execute(() -> {
                try {
                    // Make sure reader and writer start at the same time
                    startLatch.countDown();
                    if (!startLatch.await(1000L, TimeUnit.MILLISECONDS)) {
                        throw new RuntimeException("Bogus reader start.");
                    }
                    try (OutputStream out = Files.newOutputStream(testFile)) {
                        testFileExists.countDown();
                        int maxLoops = 10;
                        IntStream.range(0, maxLoops).forEach(i -> {
                            byte[] msg = ("hello " + i + "\n").getBytes(StandardCharsets.UTF_8);
                            try {
                                out.write(msg);
                                out.flush();
                                bytesWritten.addAndGet(msg.length);
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            if (fakeSlowWriteMs > 0 && i < maxLoops - 1) {
                                try {
                                    Thread.sleep(fakeSlowWriteMs);
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                writeFinished.set(true);
            });
            // Reader
            CountDownLatch doneLatch = new CountDownLatch(1);
            executor.execute(() -> {
                try {
                    // Make sure reader and writer start at the same time
                    startLatch.countDown();
                    if (!startLatch.await(1000L, TimeUnit.MILLISECONDS)) {
                        throw new RuntimeException("Bogus writer start.");
                    }
                    int bytesRead = 0;
                    int bytesRequired = 1; // Number of bytes read from file in one go.
                    int maxWaitTimeMs = 1000;
                    if (!testFileExists.await(500L, TimeUnit.MILLISECONDS)) {
                        throw new RuntimeException("Writer did not open file for reading within 500 ms.");
                    }
                    try (InputStream in = Files.newInputStream(testFile)) {
                        boolean eof = false;
                        while (!eof) {
                            if (!writeFinished.get()) {
                                if (bytesWritten.get() - bytesRead < bytesRequired) {
                                    int sleepTimeTotal = 0;
                                    while (!writeFinished.get()) {
                                        Thread.sleep(1);
                                        if (bytesWritten.get() - bytesRead >= bytesRequired) {
                                            break; // break the waiting loop, read the available bytes.
                                        }
                                        sleepTimeTotal += 1;
                                        if (sleepTimeTotal >= maxWaitTimeMs) {
                                            throw new RuntimeException("No bytes available to read within waiting time.");
                                        }
                                    }
                                }
                            }
                            int b = in.read();
                            bytesRead += 1;
                            if (b < 0) {
                                eof = true;
                            } else {
                                System.out.print( (char) b);
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                doneLatch.countDown();
            });
            if (!doneLatch.await(3000L, TimeUnit.MILLISECONDS)) {
                throw new RuntimeException("Reader and writer did not finish within 3 seconds.");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        executor.shutdownNow();
        System.out.println("\nFinished.");
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...