Я заметил исключение stream-thread [x-CleanupThread] Не удалось удалить каталог состояний с нашим приложением kafka streams.Приложение использует оконное хранилище состояний и определяется как:
Stores.windowStoreBuilder(
Stores.persistentWindowStore(
storeName,
retentionPeriod,
retentionWindowSize,
false),
Serdes.String(),
Serdes.String()).withCachingEnabled();
Это не проблема теста с использованием драйвера топологии.Это в реальном развернутом потоковом приложении.Каждые десять минут он будет пытаться удалить каталог и завершится ошибкой с трассировкой стека ниже.Проверяя каталог, он выглядит пустым с помощью ls -al.Я также попытался изменить права доступа к каталогу с помощью chmod 777, но безуспешно.
Трассировка стека:
java.nio.file.DirectoryNotEmptyException: /data/consumer/0_17
at sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:242)
at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
at java.nio.file.Files.delete(Files.java:1126)
at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:763)
at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:746)
at java.nio.file.Files.walkFileTree(Files.java:2688)
at java.nio.file.Files.walkFileTree(Files.java:2742)
at org.apache.kafka.common.utils.Utils.delete(Utils.java:746)
at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:290)
at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:253)
at org.apache.kafka.streams.KafkaStreams$2.run(KafkaStreams.java:794)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)