Я пишу многопоточную программу на Java, и у меня работает один поток записи.Как только потоки обработали порцию данных, они записывают в LinkedBlockingQueue
в потоке записи с помощью метода synchronized writeToFile
в writer
.
Идея состоит в том, что, как только очередь доберется до определенногоразмер, потоки заблокированы от добавления в очередь, и данные выводятся в файл.Я обрабатываю большие объемы данных (20-50 ГБ за раз), и это помогает уменьшить объем используемой оперативной памяти.(Если есть лучший способ сделать это, я открыт для предложений!)
Проблема, с которой я сталкиваюсь, заключается в том, что, несмотря на синхронизацию метода writeToFile
и запись в файл через emptyQueues
в блоке synchonrized
потоки все еще добавляются в очередь, пока поток записывает в файл.
@Component("writer")
public class WriterImpl implements Writer {
private boolean isRunning;
private PrintWriter fastQWriter1, fastQWriter2;
private final Queue<FastQRecord> fastQQueue1 = new LinkedBlockingQueue<>();
private final Queue<FastQRecord> fastQQueue2 = new LinkedBlockingQueue<>();
private final int MAX_QUEUE_SIZE = 5000;
@Override
public void setOutputFiles(File fastQ1, File fastQ2) {
try{
fastQWriter1 = new PrintWriter(new FileOutputStream(fastQ1));
fastQWriter2 = new PrintWriter(new FileOutputStream(fastQ2));
}catch (IOException ioe){
System.out.println(ioe.getMessage());
}
}
@Override
public synchronized void writeToFile(FastQRecord one, FastQRecord two) {
fastQQueue1.add(one);
fastQQueue2.add(two);
}
@Override
public void close() {
isRunning = false;
emptyQueues();
fastQWriter1.flush();
fastQWriter1.close();
fastQWriter2.flush();
fastQWriter2.close();
}
@Override
public void run() {
isRunning = true;
while(isRunning){
//do stuff
if(fastQQueue1.size() > MAX_QUEUE_SIZE){ //empty queues - 5000 record pairs at a time
synchronized (fastQQueue1){
synchronized (fastQQueue2){
emptyQueues();
}
}
}
}
}
private void emptyQueues() {
while(fastQQueue1.size() > 0){
FastQRecord one = fastQQueue1.poll();
fastQWriter1.println(one.getId());
fastQWriter1.println(one.getRawSequence());
fastQWriter1.println(one.getPlus());
fastQWriter1.println(one.getQualityString());
}
while(fastQQueue2.size() > 0){
FastQRecord two = fastQQueue2.poll();
fastQWriter2.println(two.getId());
fastQWriter2.println(two.getRawSequence());
fastQWriter2.println(two.getPlus());
fastQWriter2.println(two.getQualityString());
}
}
}
FastQRecord
- это простой POJO, содержащий данные, которые мне нужно записать в файл:
public class FastQRecord {
private String id;
private String rawSequence;
private char plus;
private String qualityString;
public FastQRecord(String id, String rawSequence, char plus, String qualityString) {
this.id = id;
this.rawSequence = rawSequence;
this.plus = plus;
this.qualityString = qualityString;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getRawSequence() {
return rawSequence;
}
public void setRawSequence(String rawSequence) {
this.rawSequence = rawSequence;
}
public char getPlus() {
return plus;
}
public void setPlus(char plus) {
this.plus = plus;
}
public String getQualityString() {
return qualityString;
}
public void setQualityString(String qualityString) {
this.qualityString = qualityString;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FastQRecord that = (FastQRecord) o;
return id.equals(that.id);
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public String toString() {
return "FastQRecord{" +
"id=" + id + '\n' +
", rawSequence=" + rawSequence + '\n' +
", plus=" + plus + '\n' +
", qualityString=" + qualityString + '\n' +
'}';
}
}