Я занимаюсь разработкой простого приложения Kafka Stream, которое извлекает сообщения из темы и помещает ее в другую тему после преобразования.Я использую Intelij для моей разработки.
Когда я отлаживаю / запускаю это приложение, оно отлично работает, если моя IDE и сервер Kafka, сидящий на ЖЕ машине
(т.е. с BOOTSTRAP_SERVERS_CONFIG =localhost: 9092 и SCHEMA_REGISTRY_URL_CONFIG = localhost: 8081)
Однако, когда я пытаюсь использовать другую машину для разработки
(т.е. с BOOTSTRAP_SERVERS_CONFIG = XXX.XX.XXX: 9092 и SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081, где XXX.XXX.XXX - это ip-адрес моей Kafka),
процесс отладки запускается без проблем на первомвремя .Однако, когда я запускаю второй раз после сброса смещения, я получил следующую ошибку:
ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297)
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:
Если я изменил my_application_id
на my_application_id2
и запустил его, он снова работает в 1-й раз, носнова получаю сообщение об ошибке, если я запускаю его снова.
В моем последнем предложении в моем заявлении содержится следующий код:
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Любой совет, как решить эту проблему?
ОБНОВЛЕНИЕ:
Я проверил каталог состояний, созданный на моей машине для разработки (платформа Windows), и, если я удалю этот каталог вручную перед повторным запуском, ошибки не будет найдено.Я попытался запустить мою IDE с правами администратора, потому что я думаю, что это может быть связано с разрешением для папки.Однако это не помогает.
Полный стек для справки:
ИНФОРМАЦИЯ Версия Kafka: 1.1.0 (org.apache.kafka.common.utils.AppInfoParser: 109)ИНФОРМАЦИЯ Kafka commitId: fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser: 110) INFO stream-thread [main] Удаление каталога состояний 0_0 для задачи 0_0 как пользователя, вызывающего очистку.(org.apache.kafka.streams.processor.internals.StateDirectory: 281) Отключен от целевой виртуальной машины, адрес: «127.0.0.1:16552», транспорт: «socket» Исключение в потоке «main» org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: C: \ workspace \ bennychan \ kafka-streams \ my_application_001 \ 0_0 в org.apache.kafka.streams.processor.internals.StateDirectory.clean (StateDirectory.java:2) в org.apache.kafka.streams.KafkaStreams.cleanUp (KafkaStreams.java:931) в com.macroviewhk.financialreport.simpleStream.start (simpleStream.java:60) в com.macroviewhk.financialreport.simpleStream.main (simpleStream).java: 45) Вызвано: java.nio.file.DirectoryNotEmptyException: C: \ workspace \ bennychan \ kafka-streams \ my_application_001 \ 0_0 at sun.nio.fs.WindowsFileSystemProvider.implDelete (WindowsFileSystemProvider.java:266) в sun.nio.fs.AbstractFileSystemProvider.delete (AbstractFileSystemProvider.java:103) в java.nio.file.Files.delete (Files.java:1126) в org.apache.kafka.common.utils.Utils $ 1.postVisitDirectory (Utils.java:651) в org.apache.kafka.common.utils.Utils $ 1.postVisitDirectory (Utils.java:634) в java.nio.file.Files.walkFileTree (Файлы.java: 2688) в java.nio.file.Files.walkFileTree (Files.java:2742) в org.apache.kafka.common.utils.Utils.delete (Utils.java:634) Поток потока с ошибкой [main] Сбойудалить директорию состояния.(org.apache.kafka.streams.processor.internals.StateDirectory: 297) в org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks (StateDirectory.java:287) java.nio.file.DirectoryNotEmptyException:\ workspace \ bennychan \ kafka-streams \ my_application_001 \ 0_0 в org.apache.kafka.streams.processor.internals.StateDirectory.clean (StateDirectory.java:228) в sun.nio.fs.WindowsFileSystemProvider.implDelete (WindowsFileSystemPide266) ... еще 3 в sun.nio.fs.AbstractFileSystemProvider.delete (AbstractFileSystemProvider.java:103) в java.nio.file.Files.delete (Files.java:1126)в org.apache.kafka.common.utils.Utils $ 1.postVisitDirectory (Utils.java:651) в org.apache.kafka.common.utils.Utils $ 1.postVisitDirectory (Utils.java:634) в java.nio.file.Files.walkFileTree (Files.java:2688) по адресу java.nio.file.Files.walkFileTree (Files.java:2742) по адресу org.apache.kafka.common.utils.Utils.delete (Utils.java:634) по адресуorg.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks (StateDirectory.java:287) в org.apache.kafka.streams.processor.internals.StateDirectory.clean (StateDirectory.java:228) в org.apache.kafka.streams.KafkaStreams.cleanUp (KafkaStreams.java:931) в com.macroviewhk.financialreport.simpleStream.start (simpleStream.java:60) в com.macroviewhk.financialreport.simpleStream.main (simpleStream.java:45)
ОБНОВЛЕНИЕ 2: после очередной подробной проверки строка ниже выдает IOException
Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
Эта строка находится по адресу kafka-clients-1.1.0.jar org.apache.kafka.common.utilsUtils.class
Может быть, это проблема с Windowsсистема (извините, что я не опытный программист JAVA).