Хроника Потребитель не правильно читает записи? - PullRequest
0 голосов
/ 11 сентября 2018

Я использую хроническую очередь (5.16.13) для записи и чтения значений json в файл хроники. Для записи объектов я использую следующее в цикле

try (final DocumentContext dc = appender.writingDocument()) {
        dc.wire().write(() -> "msg").text("Hallo asdf");
        System.out.println("your data was store to index="+ dc.index());
        return true;
    } catch (Exception e) {
        logger.warn("Unable to store value to chronicle", e);
        return false;
    }

и для чтения элементов я делаю следующий вызов в цикле

DocumentContext documentContext;
    do {
        documentContext = tailer.readingDocument();
        currentOffset = documentContext.index();
        System.out.println("Current offset: " + currentOffset);
    } while (!documentContext.isData());

Что я наблюдаю, так это то, что переменная currentOffset не меняется, и через некоторое время (кажется, в зависимости от размера полезной нагрузки) цикл становится бесконечным, и текущее смещение имеет сумасшедшие значения. Выход (укороченный) для первого цикла составляет

Writing 0
your data was store to index=76385993359360
Writing 1
your data was store to index=76385993359361
Writing 2
your data was store to index=76385993359362
Writing 3
your data was store to index=76385993359363
Writing 4
your data was store to index=76385993359364
Writing 5
your data was store to index=76385993359365
Writing 6
your data was store to index=76385993359366
Writing 7
your data was store to index=76385993359367
Writing 8
your data was store to index=76385993359368
Writing 9
your data was store to index=76385993359369
Writing 10
your data was store to index=76385993359370
Writing 11
your data was store to index=76385993359371
Writing 12
your data was store to index=76385993359372
Writing 13
your data was store to index=76385993359373
Writing 14
your data was store to index=76385993359374
Writing 15
your data was store to index=76385993359375
Writing 16
your data was store to index=76385993359376
Writing 17
your data was store to index=76385993359377
Writing 18
your data was store to index=76385993359378
Writing 19
your data was store to index=76385993359379
Writing 20
your data was store to index=76385993359380
Writing 21
your data was store to index=76385993359381
Writing 22
your data was store to index=76385993359382
Writing 23
your data was store to index=76385993359383
Writing 24
your data was store to index=76385993359384
Writing 25
your data was store to index=76385993359385
Writing 26
your data was store to index=76385993359386

А для второго цикла

Reading 0
Current offset: 76385993359360
Reading 1
Current offset: 76385993359360
Reading 2
Current offset: 76385993359360
Reading 3
Current offset: 76385993359360
Reading 4
Current offset: 76385993359360
Reading 5
Current offset: 76385993359360
Reading 6
Current offset: 76385993359360
Reading 7
Current offset: 76385993359360
Reading 8
Current offset: 76385993359360
Reading 9
Current offset: 76385993359360
Reading 10
Current offset: 76385993359360
Reading 11
Current offset: 76385993359360
Reading 12
Current offset: 76385993359360
Reading 13
Current offset: 76385993359360
Reading 14
Current offset: 76385993359360
Reading 15
Current offset: 76385993359360
Reading 16
Current offset: 76385993359360
Reading 17
Current offset: 76385993359360
Reading 18
Current offset: 76385993359360
Reading 19
Current offset: 76385993359360
Reading 20
Current offset: 76385993359360
Reading 21
Current offset: 76385993359360
Reading 22
Current offset: 76385993359360
Reading 23
Current offset: 76385993359360
Reading 24
Current offset: 76385993359360
Reading 25
Current offset: -9223372036854775808

Я что-то делаю не так? Может тогда кто-нибудь подскажет мне правильное использование?

Большое спасибо!

Редактировать: добавлен минимальный рабочий пример

Следующий юнит-тест не пройден для меня.

@Test
public void fails() throws Exception {
    String basePath = System.getProperty("java.io.tmpdir");
    String path = Files.createTempDirectory(Paths.get(basePath), "chronicle-")
            .toAbsolutePath()
            .toString();
    logger.info("Using temp path '{}'", path);

    SingleChronicleQueue chronicleQueue = SingleChronicleQueueBuilder
            .single()
            .path(path)
            .build();

    // Create Appender
    ExcerptAppender appender = chronicleQueue.acquireAppender();

    // Create Tailer
    ExcerptTailer tailer = chronicleQueue.createTailer();
    tailer.toStart();

    int numberOfRecords = 10;

    // Write
    for (int i = 0; i <= numberOfRecords; i++) {
        System.out.println("Writing " + i);
        try (final DocumentContext dc = appender.writingDocument()) {
            dc.wire().write(() -> "msg").text("Hello World!");
            System.out.println("your data was store to index=" + dc.index());
        } catch (Exception e) {
            logger.warn("Unable to store value to chronicle", e);
        }
    }
    // Read
    for (int i = 0; i <= numberOfRecords; i++) {
        System.out.println("Reading " + i);
        DocumentContext documentContext = tailer.readingDocument();
        long currentOffset = documentContext.index();
        System.out.println("Current offset: " + currentOffset);

        Wire wire = documentContext.wire();

        if (wire != null) {
            String msg = wire
                    .read("msg")
                    .text();
        }
    }

    chronicleQueue.close();
} 

Вывод

Writing 0
your data was store to index=76385993359360
Writing 1
your data was store to index=76385993359361
Writing 2
your data was store to index=76385993359362
Writing 3
your data was store to index=76385993359363
Writing 4
your data was store to index=76385993359364
Writing 5
your data was store to index=76385993359365
Writing 6
your data was store to index=76385993359366
Writing 7
your data was store to index=76385993359367
Writing 8
your data was store to index=76385993359368
Writing 9
your data was store to index=76385993359369
Writing 10
your data was store to index=76385993359370
Reading 0
Current offset: 76385993359360
Reading 1
Current offset: 76385993359360
Reading 2
Current offset: 76385993359360
Reading 3
Current offset: 76385993359360
Reading 4
Current offset: -9223372036854775808
Reading 5
Current offset: -9223372036854775808
Reading 6
Current offset: -9223372036854775808
Reading 7
Current offset: -9223372036854775808
Reading 8
Current offset: -9223372036854775808
Reading 9
Current offset: -9223372036854775808
Reading 10
Current offset: -9223372036854775808

Ответы [ 2 ]

0 голосов
/ 26 декабря 2018

Использование DocumentContext предназначено для использования в качестве одного из интерфейсов более низкого уровня и не всем по вкусу. Я предпочитаю использовать подход MethodReader / MethodWriter, если у вас нет причин работать на более низком уровне.

@Test
public void works() {
    String path = OS.TMP + "/chronicle-" + System.nanoTime();
    System.out.println("Using temp path " + path);

    try (SingleChronicleQueue queue = SingleChronicleQueueBuilder
            .single()
            .path(path)
            .build()) {

        ExcerptAppender appender = queue.acquireAppender();
        Messager messager = appender.methodWriter(Messager.class);

        int numberOfRecords = 10;

        // Write
        for (int i = 0; i <= numberOfRecords; i++) {
            System.out.print("Writing " + i);
            messager.msg("Hello World!");
            System.out.println(", your data was stored at index=" + appender.lastIndexAppended());
        }

        ExcerptTailer tailer = queue.createTailer();
        MethodReader reader = tailer.methodReader((Messager) msg -> {
            System.out.println("Current offset: " + tailer.index()
                    + " msg: " + msg);
        });

        // Read
        while (reader.readOne()) {
            // busy wait.
        }
    }
}

Это печатает

Using temp path C:\Users\peter\AppData\Local\Temp\/chronicle-412979753710181
[main] DEBUG net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in C:\Users\peter\AppData\Local\Temp\chronicle-412979753710181\metadata.cq4t took 15.418 ms.
[main] DEBUG net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in C:\Users\peter\AppData\Local\Temp\chronicle-412979753710181\20181226.cq4 took 25.061 ms.
Writing 0, your data was stored at index=76841259892736
Writing 1, your data was stored at index=76841259892737
Writing 2, your data was stored at index=76841259892738
Writing 3, your data was stored at index=76841259892739
Writing 4, your data was stored at index=76841259892740
Writing 5, your data was stored at index=76841259892741
Writing 6, your data was stored at index=76841259892742
Writing 7, your data was stored at index=76841259892743
Writing 8, your data was stored at index=76841259892744
Writing 9, your data was stored at index=76841259892745
Writing 10, your data was stored at index=76841259892746

Current offset: 76841259892736 msg: Hello World!
Current offset: 76841259892737 msg: Hello World!
Current offset: 76841259892738 msg: Hello World!
Current offset: 76841259892739 msg: Hello World!
Current offset: 76841259892740 msg: Hello World!
Current offset: 76841259892741 msg: Hello World!
Current offset: 76841259892742 msg: Hello World!
Current offset: 76841259892743 msg: Hello World!
Current offset: 76841259892744 msg: Hello World!
Current offset: 76841259892745 msg: Hello World!
Current offset: 76841259892746 msg: Hello World!
[main] DEBUG net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released C:\Users\peter\AppData\Local\Temp\chronicle-412979753710181\20181226.cq4
[main] DEBUG net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released C:\Users\peter\AppData\Local\Temp\chronicle-412979753710181\20181226.cq4

ПРИМЕЧАНИЕ. Здесь записываются те же данные, что и в исходном сообщении.

Преимущество использования этого интерфейсного подхода состоит в том, что вы можете полностью реализовать свой бизнес-компонент, используя интерфейсы методов с DTO, и вообще не использовать Chronicle (или на транспорте). Это упрощает тестирование бизнес-логики при удалении транспорта из тестов.

0 голосов
/ 11 сентября 2018

Я нашел ответ сам, используя предложение от @PeterLawrey и оборачивая контекст документа в try-with-resources. Это решает проблему. Смотрите исправленный фрагмент ниже

@Test
public void works() throws Exception {
    String basePath = System.getProperty("java.io.tmpdir");
    String path = Files.createTempDirectory(Paths.get(basePath), "chronicle-")
            .toAbsolutePath()
            .toString();
    logger.info("Using temp path '{}'", path);

    SingleChronicleQueue chronicleQueue = SingleChronicleQueueBuilder
            .single()
            .path(path)
            .build();

    // Create Appender
    ExcerptAppender appender = chronicleQueue.acquireAppender();

    // Create Tailer
    ExcerptTailer tailer = chronicleQueue.createTailer();
    tailer.toStart();

    int numberOfRecords = 10;

    // Write
    for (int i = 0; i <= numberOfRecords; i++) {
        System.out.println("Writing " + i);
        try (final DocumentContext dc = appender.writingDocument()) {
            dc.wire().write(() -> "msg").text("Hello World!");
            System.out.println("your data was store to index=" + dc.index());
        } catch (Exception e) {
            logger.warn("Unable to store value to chronicle", e);
        }
    }
    // Read
    for (int i = 0; i <= numberOfRecords; i++) {
        System.out.println("Reading " + i);
        try (DocumentContext documentContext = tailer.readingDocument()) {
            long currentOffset = documentContext.index();
            System.out.println("Current offset: " + currentOffset);

            Wire wire = documentContext.wire();

            if (wire != null) {
                String msg = wire
                        .read("msg")
                        .text();
            }
        }
    }

    chronicleQueue.close();
}

, который производит ожидаемый результат

Writing 0
your data was store to index=76385993359360
Writing 1
your data was store to index=76385993359361
Writing 2
your data was store to index=76385993359362
Writing 3
your data was store to index=76385993359363
Writing 4
your data was store to index=76385993359364
Writing 5
your data was store to index=76385993359365
Writing 6
your data was store to index=76385993359366
Writing 7
your data was store to index=76385993359367
Writing 8
your data was store to index=76385993359368
Writing 9
your data was store to index=76385993359369
Writing 10
your data was store to index=76385993359370
Reading 0
Current offset: 76385993359360
Reading 1
Current offset: 76385993359361
Reading 2
Current offset: 76385993359362
Reading 3
Current offset: 76385993359363
Reading 4
Current offset: 76385993359364
Reading 5
Current offset: 76385993359365
Reading 6
Current offset: 76385993359366
Reading 7
Current offset: 76385993359367
Reading 8
Current offset: 76385993359368
Reading 9
Current offset: 76385993359369
Reading 10
Current offset: 76385993359370

Надеюсь, это поможет кому-то еще.

...