Kafka Streams не удалось удалить каталог состояния - DirectoryNotEmptyException - PullRequest
1 голос
/ 23 мая 2019

Я заметил исключение stream-thread [x-CleanupThread] Не удалось удалить каталог состояний с нашим приложением kafka streams.Приложение использует оконное хранилище состояний и определяется как:

Stores.windowStoreBuilder(
    Stores.persistentWindowStore(
        storeName,
        retentionPeriod,
        retentionWindowSize,
        false),
    Serdes.String(),
    Serdes.String()).withCachingEnabled();  

Это не проблема теста с использованием драйвера топологии.Это в реальном развернутом потоковом приложении.Каждые десять минут он будет пытаться удалить каталог и завершится ошибкой с трассировкой стека ниже.Проверяя каталог, он выглядит пустым с помощью ls -al.Я также попытался изменить права доступа к каталогу с помощью chmod 777, но безуспешно.

Трассировка стека:

 java.nio.file.DirectoryNotEmptyException: /data/consumer/0_17
        at sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:242)
        at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
        at java.nio.file.Files.delete(Files.java:1126)
        at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:763)
        at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:746)
        at java.nio.file.Files.walkFileTree(Files.java:2688)
        at java.nio.file.Files.walkFileTree(Files.java:2742)
        at org.apache.kafka.common.utils.Utils.delete(Utils.java:746)
        at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:290)
        at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:253)
        at org.apache.kafka.streams.KafkaStreams$2.run(KafkaStreams.java:794)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
...