Запись файлов на основе асинхронной очереди - PullRequest
0 голосов
/ 29 мая 2018

Я пишу многопоточную программу на Java, и у меня работает один поток записи.Как только потоки обработали порцию данных, они записывают в LinkedBlockingQueue в потоке записи с помощью метода synchronized writeToFile в writer.

Идея состоит в том, что, как только очередь доберется до определенногоразмер, потоки заблокированы от добавления в очередь, и данные выводятся в файл.Я обрабатываю большие объемы данных (20-50 ГБ за раз), и это помогает уменьшить объем используемой оперативной памяти.(Если есть лучший способ сделать это, я открыт для предложений!)

Проблема, с которой я сталкиваюсь, заключается в том, что, несмотря на синхронизацию метода writeToFile и запись в файл через emptyQueuesв блоке synchonrized потоки все еще добавляются в очередь, пока поток записывает в файл.

@Component("writer")
public class WriterImpl implements Writer {

private boolean isRunning;
private PrintWriter fastQWriter1, fastQWriter2;
private final Queue<FastQRecord> fastQQueue1 = new LinkedBlockingQueue<>();
private final Queue<FastQRecord> fastQQueue2 = new LinkedBlockingQueue<>();
private final int MAX_QUEUE_SIZE = 5000;

@Override
public void setOutputFiles(File fastQ1, File fastQ2) {
    try{
        fastQWriter1 = new PrintWriter(new FileOutputStream(fastQ1));
        fastQWriter2 = new PrintWriter(new FileOutputStream(fastQ2));
    }catch (IOException ioe){
        System.out.println(ioe.getMessage());
    }
}

@Override
public synchronized void writeToFile(FastQRecord one, FastQRecord two) {
    fastQQueue1.add(one);
    fastQQueue2.add(two);
}

@Override
public void close() {
    isRunning = false;

    emptyQueues();

    fastQWriter1.flush();
    fastQWriter1.close();
    fastQWriter2.flush();
    fastQWriter2.close();
}

@Override
public void run() {
    isRunning = true;

    while(isRunning){
        //do stuff
        if(fastQQueue1.size() > MAX_QUEUE_SIZE){ //empty queues - 5000 record pairs at a time

            synchronized (fastQQueue1){
                synchronized (fastQQueue2){
                    emptyQueues();
                }
            }
        }
    }
}

private void emptyQueues() {
    while(fastQQueue1.size() > 0){
        FastQRecord one = fastQQueue1.poll();

        fastQWriter1.println(one.getId());
        fastQWriter1.println(one.getRawSequence());
        fastQWriter1.println(one.getPlus());
        fastQWriter1.println(one.getQualityString());
    }

    while(fastQQueue2.size() > 0){

        FastQRecord two = fastQQueue2.poll();
        fastQWriter2.println(two.getId());
        fastQWriter2.println(two.getRawSequence());
        fastQWriter2.println(two.getPlus());
        fastQWriter2.println(two.getQualityString());

    }
}
}  

FastQRecord - это простой POJO, содержащий данные, которые мне нужно записать в файл:

public class FastQRecord {

private String id;
private String rawSequence;
private char plus;
private String qualityString;

public FastQRecord(String id, String rawSequence, char plus, String qualityString) {
    this.id = id;
    this.rawSequence = rawSequence;
    this.plus = plus;
    this.qualityString = qualityString;
}

public String getId() {
    return id;
}

public void setId(String id) {
    this.id = id;
}

public String getRawSequence() {
    return rawSequence;
}

public void setRawSequence(String rawSequence) {
    this.rawSequence = rawSequence;
}

public char getPlus() {
    return plus;
}

public void setPlus(char plus) {
    this.plus = plus;
}

public String getQualityString() {
    return qualityString;
}

public void setQualityString(String qualityString) {
    this.qualityString = qualityString;
}

@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    FastQRecord that = (FastQRecord) o;

    return id.equals(that.id);
}

@Override
public int hashCode() {
    return id.hashCode();
}

@Override
public String toString() {
    return "FastQRecord{" +
            "id=" + id + '\n' +
            ", rawSequence=" + rawSequence + '\n' +
            ", plus=" + plus + '\n' +
            ", qualityString=" + qualityString + '\n' +
            '}';
}
}

1 Ответ

0 голосов
/ 29 мая 2018

Вы можете воспользоваться интерфейсом BlockingQueue (т.е. заблокировать поток, если в очереди нет места), используя метод put() вместо add(), который являетсяунаследовано от Collection.

Но для того, чтобы заставить поток ждать операции put(), ваша очередь должна знать ее максимальный размер, объявив ее как LinkedBlockingQueue<>(MAX_QUEUE_SIZE).Если вы не укажете максимальную емкость очереди, будет предполагаться, что это Integer.MAX_VALUE

. Я также предлагаю вам синхронизировать ваш доступ в очередь перед проверкой ее размера (илиесли он заполнен) и ваш метод run() будет выглядеть примерно так:

@Override
public void run() {
    isRunning = true;

    while(isRunning){
        //do stuff
        synchronized(fastQQueue1){
            if(fastQQueue1.remainingCapacity() == 0){ //empty queues - 5000 record pairs at a time

                synchronized (fastQQueue1){
                    synchronized (fastQQueue2){
                        emptyQueues();
                    }
                }
            }
        }
    }
}

Аналогичное изменение будет применено к вашему методу emptyQueues().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...