Исключение NPE при чтении из очереди при удалении старых файлов данных очереди - PullRequest
0 голосов
/ 18 сентября 2018

Я открыл проблему для этого: https://github.com/OpenHFT/Chronicle-Queue/issues/534

Я пытаюсь реализовать периодическую логику очистки файла старой очереди с помощью StoreFileListener.Я использую последний выпуск net.openhft :хроника-очередь: 5.16.13.У меня возникает проблема: после того, как файл цикла прокрутки прокручивается из-за того, что активен следующий цикл, я удаляю файл очереди, который только что был выпущен в StoreFileListener, затем я создаю нового тейлера и пытаюсь прочитать сообщение.Он выдает ниже NPE:

То же самое происходит, если я создаю совершенно новую очередь, указывающую на ту же директорию очереди, и создаю тейлера.

вижу ниже исключение NPE при попытке реализовать логику очистки файла очереди:

java.lang.NullPointerException
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreTailer.inACycle(SingleChronicleQueueExcerpts.java:1198)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreTailer.readingDocument(SingleChronicleQueueExcerpts.java:1000)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreTailer.readingDocument(SingleChronicleQueueExcerpts.java:942)
at net.openhft.chronicle.wire.MarshallableIn.readText(MarshallableIn.java:95)
at com.test.edge.api.queue.TestDeleteQueueFile.testQueueFileDeletionWhileInUse(TestDeleteQueueFile.java:133)

Контрольный пример для воспроизведения приведен ниже:

package com.test.edge.api.queue;

import net.openhft.chronicle.core.time.SetTimeProvider;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TestDeleteQueueFile {

    private Path tempQueueDir;

    @Before
    public void setUp() throws Exception {
        tempQueueDir = Files.createTempDirectory("unitTestQueueDir");
    }

    @After
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory(tempQueueDir.toFile());
        Assert.assertFalse(tempQueueDir.toFile().exists());
        System.out.println("Deleted " + tempQueueDir.toFile().getAbsolutePath());
    }

    @Test
    public void testQueueFileDeletionWhileInUse() throws Exception {
        SetTimeProvider timeProvider = new SetTimeProvider();

        String queueName = "unitTestQueue";

        QueueStoreFileListener listener = new QueueStoreFileListener(queueName);

        try (ChronicleQueue queue = SingleChronicleQueueBuilder.binary(tempQueueDir + "/" + queueName).
                timeProvider(timeProvider).storeFileListener(listener)
                .build()) {

            ExcerptAppender appender = queue.acquireAppender();

            System.out.println("first index : " + queue.firstIndex());
            Assert.assertEquals(Long.MAX_VALUE, queue.firstIndex());

            //write 10 records should go to first day file
            for (int i = 0; i < 10; i++) {
                appender.writeText("test");
            }

            long indexAfter10Records = appender.lastIndexAppended();
            System.out.println("index after writing 10 records: " + indexAfter10Records);

            //roll to next day file
            timeProvider.advanceMillis(24 * 60 * 60 * 1000);


            //write 5 records in next file
            for (int i = 0; i < 5; i++) {
                appender.writeText("test2");
            }

            Map<String, List> queueToRollFilesOnAcquireMap = listener.getQueueToRollFilesOnAcquireMap();
            Map<String, List> queueToRollFilesOnReleaseMap = listener.getQueueToRollFilesOnReleaseMap();

            Assert.assertEquals(1, queueToRollFilesOnAcquireMap.size());
            List<String> files = queueToRollFilesOnAcquireMap.get(queueName);
            Assert.assertEquals(1, files.size());
            String secondFile = files.get(0);

            //other will have 1 as only first file is released
            files = queueToRollFilesOnReleaseMap.get(queueName);
            Assert.assertEquals(1, files.size());
            String firstFile = files.get(0);

            Assert.assertNotEquals(firstFile, secondFile);


            long firstIndex = queue.firstIndex();


            long indexAfter5Records = appender.lastIndexAppended();
            System.out.println("index after writing 5 records: " + indexAfter5Records);

            //now lets create one reader which will read all content
            ExcerptTailer excerptTailer = queue.createTailer();
            for (int i = 0; i < 10; i++) {
                Assert.assertEquals("test", excerptTailer.readText());
            }

            System.out.println("index after reading 10 records: " + excerptTailer.index());
            Assert.assertEquals(firstIndex, excerptTailer.index() - 10);
            for (int i = 0; i < 5; i++) {
                Assert.assertEquals("test2", excerptTailer.readText());
            }

            System.out.println("index after reading 5 records: " + excerptTailer.index());
            Assert.assertEquals(indexAfter5Records, excerptTailer.index() - 1);

            //lets delete first file
            System.out.println("Deleting first release file: " + firstFile);
            Files.delete(Paths.get(firstFile));


            long firstIndex3 = queue.firstIndex();
            Assert.assertEquals(firstIndex, firstIndex3);

            // and create a tailer it should only read
            //data in second file
            ExcerptTailer excerptTailer2 = queue.createTailer();
            System.out.println("index before reading 5: " + excerptTailer2.index());

            //AFTER CREATING A BRAND NEW TAILER, BELOW ASSERTION ALSO FAILS
            //WAS EXPECTING THAT TAILER CAN READ FROM START OF QUEUE BUT INDEX IS LONG.MAX

            //Assert.assertEquals(indexAfter5Records -5, excerptTailer2.index() -1);

            //BELOW THROWS NPE, WAS EXPECTING THAT WE CAN READ FROM SECOND DAILY QUEUE FILE
            System.out.println("excerptTailer2: " + excerptTailer2.peekDocument());
            for (int i = 0; i < 5; i++) {
                Assert.assertEquals("test2", excerptTailer2.readText());
            }

            //SAME ERROR WHEN CREATING A BRAND NEW QUEUE AND TRYING TO READ IT

//            // create brand new queue and read see how it behaves
//            ChronicleQueue queue2 =  SingleChronicleQueueBuilder.binary(tempQueueDir + "/" + queueName).
//                    timeProvider(timeProvider).storeFileListener(listener)
//                    .build();
//
//            long firstIndex2 = queue2.firstIndex();
//            Assert.assertNotEquals(firstIndex, firstIndex2);
//            Assert.assertNotEquals(indexAfter5Records, firstIndex2);
//
//            ExcerptTailer excerptTailer3 = queue.createTailer();
//            for (int i = 0; i < 5; i++) {
//                Assert.assertEquals("test2", excerptTailer3.readText());
//            }

        }

    }

    final class QueueStoreFileListener implements StoreFileListener {

        private String queueName;
        private Map<String, List> queueToRollFilesOnReleaseMap = new HashMap<>();
        private Map<String, List> queueToRollFilesOnAcquireMap = new HashMap<>();

        public QueueStoreFileListener(String queueName) {
            this.queueName = queueName;
        }

        @Override
        public void onReleased(int cycle, File file) {
            System.out.println("onReleased called cycle: " + cycle + "file: " + file);

            List<String> files = queueToRollFilesOnReleaseMap.get(queueName);
            if (files == null) {
                files = new ArrayList<>();
            }

            String fileAbsPath = file.getAbsolutePath();
            if (!files.contains(fileAbsPath)) {
                files.add(fileAbsPath);
            }
            queueToRollFilesOnReleaseMap.put(queueName, files);

            //update acquire file map
            List<String> acqfiles = queueToRollFilesOnAcquireMap.get(queueName);
            acqfiles.remove(file.getAbsolutePath());
            queueToRollFilesOnAcquireMap.put(queueName, acqfiles);

        }

        @Override
        public void onAcquired(int cycle, File file) {
            System.out.println("onAcquired called cycle: " + cycle + "file: " + file);

            List<String> files = queueToRollFilesOnAcquireMap.get(queueName);
            if (files == null) {
                files = new ArrayList<>();
            }

            String fileAbsPath = file.getAbsolutePath();
            if (!files.contains(fileAbsPath)) {
                files.add(fileAbsPath);
            }

            queueToRollFilesOnAcquireMap.put(queueName, files);


        }

        public Map<String, List> getQueueToRollFilesOnAcquireMap() {
            return queueToRollFilesOnAcquireMap;
        }

        public Map<String, List> getQueueToRollFilesOnReleaseMap() {
            return queueToRollFilesOnReleaseMap;
        }
    }
}

Ответы [ 2 ]

0 голосов
/ 11 января 2019

Я столкнулся с той же проблемой, и я нашел решение. Попробуйте обновить список каталогов, прежде чем создавать тейлера. Например, вы можете добавить

queue.refreshDirectlyListing();

до

ExcerptTailer excerptTailer2 = queue.createTailer();

Это заставит перечитать список файлов очереди. А также вы можете проверить реализацию этого метода, чтобы увидеть, почему листинг не обновляется по умолчанию:

    private void setFirstAndLastCycle() {
    long now = time.currentTimeMillis();
    if (now <= firstAndLastCycleTime) {
        return;
    }

    firstCycle = directoryListing.getMinCreatedCycle();
    lastCycle = directoryListing.getMaxCreatedCycle();

    firstAndLastCycleTime = now;
}
0 голосов
/ 29 сентября 2018

В этой строке

Assert.assertEquals(indexAfter5Records -5, excerptTailer2.index() -1);

indexAfter5Records равно

0x100000004

, как и ожидалось.Это связано с тем, что старшие 32-битные содержат номер цикла, равный 1, а младшие биты содержат запись с номером цикла, равным 4, который является 5-й записью, начинающейся с 0.

* 1012.* excerptTailer2.index() должен повернуть к началу.На моей машине с Windows это 0, поскольку Windows не всегда позволяет вам удалить файл с отображенной памятью до тех пор, пока процесс не будет завершен.В Linux это не проблема.

В моем случае, поскольку файл не может быть удален, я использовал

excerptTailer2.moveToIndex(0x1_0000_0000L);

, и он читал ожидаемые текстовые сообщения.

Недавно была версия, в которой readText / writeText была сломана, но это должно быть исправлено в 5.16.15.

Если вы ищете очень стабильную версию, я предлагаю 4.15.7.Если вы хотите использовать последнюю версию, я бы взял версию, у которой нет следующего выпуска на следующий день;)

Мы надеемся сделать 5.16.x нашей стабильной версией в ближайшее время, выпустить 6.17.xкоторый будет включать поддержку Java 11.

...