Не удалось удалить каталог состояний в IDE для приложения Kafka Stream - PullRequest
0 голосов
/ 30 мая 2018

Я занимаюсь разработкой простого приложения Kafka Stream, которое извлекает сообщения из темы и помещает ее в другую тему после преобразования.Я использую Intelij для моей разработки.

Когда я отлаживаю / запускаю это приложение, оно отлично работает, если моя IDE и сервер Kafka, сидящий на ЖЕ машине

(т.е. с BOOTSTRAP_SERVERS_CONFIG =localhost: 9092 и SCHEMA_REGISTRY_URL_CONFIG = localhost: 8081)

Однако, когда я пытаюсь использовать другую машину для разработки

(т.е. с BOOTSTRAP_SERVERS_CONFIG = XXX.XX.XXX: 9092 и SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081, где XXX.XXX.XXX - это ip-адрес моей Kafka),

процесс отладки запускается без проблем на первомвремя .Однако, когда я запускаю второй раз после сброса смещения, я получил следующую ошибку:

ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) 
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:

Если я изменил my_application_id на my_application_id2 и запустил его, он снова работает в 1-й раз, носнова получаю сообщение об ошибке, если я запускаю его снова.

В моем последнем предложении в моем заявлении содержится следующий код:

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

Любой совет, как решить эту проблему?

ОБНОВЛЕНИЕ:

Я проверил каталог состояний, созданный на моей машине для разработки (платформа Windows), и, если я удалю этот каталог вручную перед повторным запуском, ошибки не будет найдено.Я попытался запустить мою IDE с правами администратора, потому что я думаю, что это может быть связано с разрешением для папки.Однако это не помогает.

Полный стек для справки:

ИНФОРМАЦИЯ Версия Kafka: 1.1.0 (org.apache.kafka.common.utils.AppInfoParser: 109)ИНФОРМАЦИЯ Kafka commitId: fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser: 110) INFO stream-thread [main] Удаление каталога состояний 0_0 для задачи 0_0 как пользователя, вызывающего очистку.(org.apache.kafka.streams.processor.internals.StateDirectory: 281) Отключен от целевой виртуальной машины, адрес: «127.0.0.1:16552», транспорт: «socket» Исключение в потоке «main» org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: C: \ workspace \ bennychan \ kafka-streams \ my_application_001 \ 0_0 в org.apache.kafka.streams.processor.internals.StateDirectory.clean (StateDirectory.java:2) в org.apache.kafka.streams.KafkaStreams.cleanUp (KafkaStreams.java:931) в com.macroviewhk.financialreport.simpleStream.start (simpleStream.java:60) в com.macroviewhk.financialreport.simpleStream.main (simpleStream).java: 45) Вызвано: java.nio.file.DirectoryNotEmptyException: C: \ workspace \ bennychan \ kafka-streams \ my_application_001 \ 0_0 at sun.nio.fs.WindowsFileSystemProvider.implDelete (WindowsFileSystemProvider.java:266) в sun.nio.fs.AbstractFileSystemProvider.delete (AbstractFileSystemProvider.java:103) в java.nio.file.Files.delete (Files.java:1126) в org.apache.kafka.common.utils.Utils $ 1.postVisitDirectory (Utils.java:651) в org.apache.kafka.common.utils.Utils $ 1.postVisitDirectory (Utils.java:634) в java.nio.file.Files.walkFileTree (Файлы.java: 2688) в java.nio.file.Files.walkFileTree (Files.java:2742) в org.apache.kafka.common.utils.Utils.delete (Utils.java:634) Поток потока с ошибкой [main] Сбойудалить директорию состояния.(org.apache.kafka.streams.processor.internals.StateDirectory: 297) в org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks (StateDirectory.java:287) java.nio.file.DirectoryNotEmptyException:\ workspace \ bennychan \ kafka-streams \ my_application_001 \ 0_0 в org.apache.kafka.streams.processor.internals.StateDirectory.clean (StateDirectory.java:228) в sun.nio.fs.WindowsFileSystemProvider.implDelete (WindowsFileSystemPide266) ... еще 3 в sun.nio.fs.AbstractFileSystemProvider.delete (AbstractFileSystemProvider.java:103) в java.nio.file.Files.delete (Files.java:1126)в org.apache.kafka.common.utils.Utils $ 1.postVisitDirectory (Utils.java:651) в org.apache.kafka.common.utils.Utils $ 1.postVisitDirectory (Utils.java:634) в java.nio.file.Files.walkFileTree (Files.java:2688) по адресу java.nio.file.Files.walkFileTree (Files.java:2742) по адресу org.apache.kafka.common.utils.Utils.delete (Utils.java:634) по адресуorg.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks (StateDirectory.java:287) в org.apache.kafka.streams.processor.internals.StateDirectory.clean (StateDirectory.java:228) в org.apache.kafka.streams.KafkaStreams.cleanUp (KafkaStreams.java:931) в com.macroviewhk.financialreport.simpleStream.start (simpleStream.java:60) в com.macroviewhk.financialreport.simpleStream.main (simpleStream.java:45)

ОБНОВЛЕНИЕ 2: после очередной подробной проверки строка ниже выдает IOException

Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {

Эта строка находится по адресу kafka-clients-1.1.0.jar org.apache.kafka.common.utilsUtils.class

Может быть, это проблема с Windowsсистема (извините, что я не опытный программист JAVA).

Ответы [ 2 ]

0 голосов
/ 29 мая 2019

Для googlers ..

В настоящее время я использую этот код Scala для помощи пользователям Windows в удалении хранилища состояний.

if (System.getProperty("os.name").toLowerCase.contains("windows")) {
  logger.info("WINDOWS OS MODE - Cleanup state store.")
  try {
    FileUtils.deleteDirectory(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
    FileUtils.forceMkdir(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
  } catch {
    case e: Exception => logger.error(e.toString)
  }
}
else {
  streams.cleanUp()
}
0 голосов
/ 11 июня 2018

Я согласен с @ ideano1, который, по-видимому, связан с https://issues.apache.org/jira/browse/KAFKA-6647 - то, что вы можете попробовать, это явно вызвать KafkaStreams#cleanUp() между тестами.Непонятно, почему в Window-OS есть проблемы.Atm, все тестирование происходит в Linux.

...