Синхронизированный механизм записи файлов во многие файлы и в то же время в Java - PullRequest
0 голосов
/ 26 ноября 2018

Я хотел бы создать механизм записи синхронизированных файлов для приложения Spring.У меня есть около 10 000 000 jsons, которые должны быть сохранены в отдельных файлах, например:

  • текст "abc" из json: {"id": "1", "text": "abc"} долженбыть сохраненным в файл "1.json"
  • текст "pofd" из json: {"id": "2", "text": "pofd"} должен быть сохранен в "2.json"файл

Другие требования:

  • можно записать несколько файлов одновременно
  • один файл может быть обновлен несколькими потоками нав то же время (много jsons с одинаковым идентификатором, но с другим текстом)

Я создал FileWriterProxy (синглтон-бин), который является основным компонентом для сохранения файлов.Он загружает ленивый компонент FileWriter (прототип bean), который отвечает за запись в файл (у него есть метод синхронизированной записи).Каждый объект FileWriter представляет отдельный файл.Я подозреваю, что мое решение не в безопасности потоков.Давайте рассмотрим следующий сценарий:

  1. есть 3 потока (Thread1, Thread2, Thread3), которые хотят записать в один и тот же файл (1.json), все они обращаются к методу записи из компонента FileWriterProxy
  2. Thread1 получает правильный FileWriter
  3. Thread1 блокирует FileWriter для файла 1.json
  4. Thread1 записывает в файл 1.json
  5. Thread1 завершает работузапись в файл и удаление FileWriter из ConcurrentHashMap
  6. , в то время как Thread2 получает FileWriter для файла 1.json и ожидает, пока Thread1 снимет блокировку
  7. Thread1 снимает блокировку и удаляет FileWriterиз ConcurrentHashMap
  8. теперь Thread2 может записывать в файл 1.json (он имеет FileWriter, который был удален из ConcurrentHashMap)
  9. Thread3 получает FileWriter для 1.json (новый! old FileWriter)был удален Thread1)
  10. Thread2 и Thread3 записывают в один и тот же файл в одно и то же времяпотому что они заблокированы на разных объектах FileWriters

Пожалуйста, исправьте меня, если я ошибаюсь.Как я могу исправить мою реализацию?

FileWriterProxy:

@Component
public class FileWriterProxy {
    private final BeanFactory beanFactory;
    private final Map<String, FileWriter> filePathsMappedToFileWriters = new ConcurrentHashMap<>();

    public FileWriterProxy(BeanFactory beanFactory) {
        this.beanFactory = beanFactory;
    }

    public void write(Path path, String data) {
        FileWriter fileWriter = getFileWriter(path);
        fileWriter.write(data);
        removeFileWrite(path);
    }

    private FileWriter getFileWriter(Path path) {
        return filePathsMappedToFileWriters.computeIfAbsent(path.toString(), e -> beanFactory.getBean(FileWriter.class, path));
    }

    private void removeFileWrite(Path path) {
        filePathsMappedToFileWriters.remove(path.toString());
    }

}

FileWriterProxyTest:

@RunWith(SpringRunner.class)
@SpringBootTest
public class FileWriterProxyTest {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final String FILE_NAME = "filename.txt";
    private File baseDirectory;
    private Path path;

    @Autowired
    private FileWriterProxy fileWriterProxy;

    @Before
    public void setUp() {
        baseDirectory = temporaryFolder.getRoot();
        path = Paths.get(baseDirectory.getAbsolutePath(), FILE_NAME);
    }

    @Test
    public void writeToFile() throws IOException {
        String data = "test";
        fileWriterProxy.write(path, data);
        String fileContent = new String(Files.readAllBytes(path));
        assertEquals(data, fileContent);
    }

    @Test
    public void concurrentWritesToFile() throws InterruptedException {
        Path path = Paths.get(baseDirectory.getAbsolutePath(), FILE_NAME);
        List<Task> tasks = Arrays.asList(
                new Task(path, "test1"),
                new Task(path, "test2"),
                new Task(path, "test3"),
                new Task(path, "test4"),
                new Task(path, "test5"));
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        List<Future<Boolean>> futures = executorService.invokeAll(tasks);

        wait(futures);
    }

    @Test
    public void manyRandomWritesToFiles() throws InterruptedException {
        List<Task> tasks = createTasks(1000);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        List<Future<Boolean>> futures = executorService.invokeAll(tasks);
        wait(futures);
    }

    private void wait(List<Future<Boolean>> tasksFutures) {
        tasksFutures.forEach(e -> {
            try {
                e.get(10, TimeUnit.SECONDS);
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        });
    }

    private List<Task> createTasks(int number) {
        List<Task> tasks = new ArrayList<>();

        IntStream.range(0, number).forEach(e -> {
            String fileName = generateFileName();
            Path path = Paths.get(baseDirectory.getAbsolutePath(), fileName);
            tasks.add(new Task(path, "test"));
        });

        return tasks;
    }

    private String generateFileName() {
        int length = 10;
        boolean useLetters = true;
        boolean useNumbers = false;
        return RandomStringUtils.random(length, useLetters, useNumbers) + ".txt";
    }

    private class Task implements Callable<Boolean> {
        private final Path path;
        private final String data;

        Task(Path path, String data) {
            this.path = path;
            this.data = data;
        }

        @Override
        public Boolean call() {
            fileWriterProxy.write(path, data);
            return true;
        }
    }
}

Конфигурация:

@Configuration
public class Config {

    @Bean
    @Lazy
    @Scope("prototype")
    public FileWriter fileWriter(Path path) {
        return new FileWriter(path);
    }

}

FileWriter:

public class FileWriter {
    private static final Logger logger = LoggerFactory.getLogger(FileWriter.class);

    private final Path path;

    public FileWriter(Path path) {
        this.path = path;
    }

    public synchronized void write(String data) {
        String filePath = path.toString();
        try {
            Files.write(path, data.getBytes());
            logger.info("File has been saved: {}", filePath);
        } catch (IOException e) {
            logger.error("Error occurred while writing to file: {}", filePath);
        }
    }

}
...