Я хотел бы создать механизм записи синхронизированных файлов для приложения Spring.У меня есть около 10 000 000 jsons, которые должны быть сохранены в отдельных файлах, например:
- текст "abc" из json: {"id": "1", "text": "abc"} долженбыть сохраненным в файл "1.json"
- текст "pofd" из json: {"id": "2", "text": "pofd"} должен быть сохранен в "2.json"файл
Другие требования:
- можно записать несколько файлов одновременно
- один файл может быть обновлен несколькими потоками нав то же время (много jsons с одинаковым идентификатором, но с другим текстом)
Я создал FileWriterProxy (синглтон-бин), который является основным компонентом для сохранения файлов.Он загружает ленивый компонент FileWriter (прототип bean), который отвечает за запись в файл (у него есть метод синхронизированной записи).Каждый объект FileWriter представляет отдельный файл.Я подозреваю, что мое решение не в безопасности потоков.Давайте рассмотрим следующий сценарий:
- есть 3 потока (Thread1, Thread2, Thread3), которые хотят записать в один и тот же файл (1.json), все они обращаются к методу записи из компонента FileWriterProxy
- Thread1 получает правильный FileWriter
- Thread1 блокирует FileWriter для файла 1.json
- Thread1 записывает в файл 1.json
- Thread1 завершает работузапись в файл и удаление FileWriter из ConcurrentHashMap
- , в то время как Thread2 получает FileWriter для файла 1.json и ожидает, пока Thread1 снимет блокировку
- Thread1 снимает блокировку и удаляет FileWriterиз ConcurrentHashMap
- теперь Thread2 может записывать в файл 1.json (он имеет FileWriter, который был удален из ConcurrentHashMap)
- Thread3 получает FileWriter для 1.json (новый! old FileWriter)был удален Thread1)
- Thread2 и Thread3 записывают в один и тот же файл в одно и то же времяпотому что они заблокированы на разных объектах FileWriters
Пожалуйста, исправьте меня, если я ошибаюсь.Как я могу исправить мою реализацию?
FileWriterProxy:
@Component
public class FileWriterProxy {
private final BeanFactory beanFactory;
private final Map<String, FileWriter> filePathsMappedToFileWriters = new ConcurrentHashMap<>();
public FileWriterProxy(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
public void write(Path path, String data) {
FileWriter fileWriter = getFileWriter(path);
fileWriter.write(data);
removeFileWrite(path);
}
private FileWriter getFileWriter(Path path) {
return filePathsMappedToFileWriters.computeIfAbsent(path.toString(), e -> beanFactory.getBean(FileWriter.class, path));
}
private void removeFileWrite(Path path) {
filePathsMappedToFileWriters.remove(path.toString());
}
}
FileWriterProxyTest:
@RunWith(SpringRunner.class)
@SpringBootTest
public class FileWriterProxyTest {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private static final String FILE_NAME = "filename.txt";
private File baseDirectory;
private Path path;
@Autowired
private FileWriterProxy fileWriterProxy;
@Before
public void setUp() {
baseDirectory = temporaryFolder.getRoot();
path = Paths.get(baseDirectory.getAbsolutePath(), FILE_NAME);
}
@Test
public void writeToFile() throws IOException {
String data = "test";
fileWriterProxy.write(path, data);
String fileContent = new String(Files.readAllBytes(path));
assertEquals(data, fileContent);
}
@Test
public void concurrentWritesToFile() throws InterruptedException {
Path path = Paths.get(baseDirectory.getAbsolutePath(), FILE_NAME);
List<Task> tasks = Arrays.asList(
new Task(path, "test1"),
new Task(path, "test2"),
new Task(path, "test3"),
new Task(path, "test4"),
new Task(path, "test5"));
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<Boolean>> futures = executorService.invokeAll(tasks);
wait(futures);
}
@Test
public void manyRandomWritesToFiles() throws InterruptedException {
List<Task> tasks = createTasks(1000);
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<Boolean>> futures = executorService.invokeAll(tasks);
wait(futures);
}
private void wait(List<Future<Boolean>> tasksFutures) {
tasksFutures.forEach(e -> {
try {
e.get(10, TimeUnit.SECONDS);
} catch (Exception e1) {
e1.printStackTrace();
}
});
}
private List<Task> createTasks(int number) {
List<Task> tasks = new ArrayList<>();
IntStream.range(0, number).forEach(e -> {
String fileName = generateFileName();
Path path = Paths.get(baseDirectory.getAbsolutePath(), fileName);
tasks.add(new Task(path, "test"));
});
return tasks;
}
private String generateFileName() {
int length = 10;
boolean useLetters = true;
boolean useNumbers = false;
return RandomStringUtils.random(length, useLetters, useNumbers) + ".txt";
}
private class Task implements Callable<Boolean> {
private final Path path;
private final String data;
Task(Path path, String data) {
this.path = path;
this.data = data;
}
@Override
public Boolean call() {
fileWriterProxy.write(path, data);
return true;
}
}
}
Конфигурация:
@Configuration
public class Config {
@Bean
@Lazy
@Scope("prototype")
public FileWriter fileWriter(Path path) {
return new FileWriter(path);
}
}
FileWriter:
public class FileWriter {
private static final Logger logger = LoggerFactory.getLogger(FileWriter.class);
private final Path path;
public FileWriter(Path path) {
this.path = path;
}
public synchronized void write(String data) {
String filePath = path.toString();
try {
Files.write(path, data.getBytes());
logger.info("File has been saved: {}", filePath);
} catch (IOException e) {
logger.error("Error occurred while writing to file: {}", filePath);
}
}
}