KafkaStreams.cleanUp () не удаляет правильный каталог - PullRequest
0 голосов
/ 29 октября 2018

У меня следующая проблема, может быть, кто-то может подсказать мне, где искать. мой проект: - kafka версии 1.1.1, kafka-streams в приложении spring-boot, предоставил EmbeddedKafka для интеграционных тестов, macosx, jdk 1.8

В моем конфиге потоков есть: - 1 глобальная таблица, - 1 глобальный магазин и - 1 государственный магазин, - 1 процессор и 1 трансформатор для глобального магазина и - 1 преобразователь с пунктуацией (WALL_CLOCK_TIME) для хранилища состояний.

Я ввел CleanupConfig для вызова KafkaStreams.cleanUp () onClose (в моих интеграционных тестах). Это хорошо работает с глобальным хранилищем, но не с хранилищем состояний, которое не восстановлено. Я включил регистрацию на kafka StateDirectory и вот что я вижу. Перед началом моего интеграционного теста:

14:39:58.609 [kstreams-3892e198-bf56-4ac1-bd21-68c580309d4d-GlobalStreamThread] DEBUG o.a.k.s.p.internals.StateDirectory - stream-thread [kstreams-3892e198-bf56-4ac1-bd21-68c580309d4d-GlobalStreamThread] Acquired global state dir lock

14:39:59.153 [kstreams-3892e198-bf56-4ac1-bd21-68c580309d4d-StreamThread-1] DEBUG o.a.k.s.p.internals.StateDirectory - stream-thread [kstreams-3892e198-bf56-4ac1-bd21-68c580309d4d-StreamThread-1] Acquired state dir lock for task 2_0

Обратите внимание, что он получил блокировку для задачи 2_0

После интеграционного теста:

14:40:07.079 [kstreams-3892e198-bf56-4ac1-bd21-68c580309d4d-GlobalStreamThread] DEBUG o.a.k.s.p.internals.StateDirectory - stream-thread [kstreams-3892e198-bf56-4ac1-bd21-68c580309d4d-GlobalStreamThread] Released global state dir lock

14:40:17.032 [Thread-6] DEBUG o.a.k.s.p.internals.StateDirectory - stream-thread [Thread-6] Acquired state dir lock for task 1_0

14:40:17.032 [Thread-6] INFO  o.a.k.s.p.internals.StateDirectory - stream-thread [Thread-6] Deleting state directory 1_0 for task 1_0 as user calling cleanup.

14:40:17.033 [Thread-6] DEBUG o.a.k.s.p.internals.StateDirectory - stream-thread [Thread-6] Released state dir lock for task 1_0

Обратите внимание, что каталог удаления для задачи 2_0 не называется

После завершения работы EmbeddedKafka это журнал из потоков kafka.

20:36:08.004 [Thread-6] DEBUG o.apache.kafka.streams.KafkaStreams - stream-client [kstreams-6e816551-a066-455d-a619-2af0d23663b4] Stopping Streams client with timeoutMillis = 10000 ms.
20:36:08.004 [Thread-6] INFO  o.apache.kafka.streams.KafkaStreams - stream-client [kstreams-6e816551-a066-455d-a619-2af0d23663b4] State transition from RUNNING to PENDING_SHUTDOWN
20:36:08.005 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [kstreams-6e816551-a066-455d-a619-2af0d23663b4-StreamThread-1] Informed to shut down
20:36:08.005 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [kstreams-6e816551-a066-455d-a619-2af0d23663b4-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
20:36:08.005 [kafka-streams-close-thread] INFO  o.a.k.s.p.i.GlobalStreamThread - global-stream-thread [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] State transition from RUNNING to PENDING_SHUTDOWN
20:36:08.036 [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] DEBUG o.a.k.s.p.i.GlobalStateManagerImpl - global-stream-thread [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] Flushing all global globalStores registered in the state manager
20:36:08.037 [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] INFO  o.a.k.s.p.i.GlobalStreamThread - global-stream-thread [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] Shutting down
20:36:08.038 [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] DEBUG o.a.k.s.p.i.GlobalStateManagerImpl - global-stream-thread [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] Closing global storage engine doc-metadata-store
20:36:08.039 [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] DEBUG o.a.k.s.p.i.GlobalStateManagerImpl - global-stream-thread [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] Closing global storage engine agent-location-store
20:36:08.040 [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] DEBUG o.a.k.s.p.internals.StateDirectory - stream-thread [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] Released global state dir lock
20:36:08.040 [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] INFO  o.a.k.s.p.i.GlobalStreamThread - global-stream-thread [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] State transition from PENDING_SHUTDOWN to DEAD
20:36:08.040 [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] INFO  o.a.k.s.p.i.GlobalStreamThread - global-stream-thread [kstreams-6e816551-a066-455d-a619-2af0d23663b4-GlobalStreamThread] Shutdown complete
20:36:18.010 [Thread-6] DEBUG o.apache.kafka.streams.KafkaStreams - stream-client [kstreams-6e816551-a066-455d-a619-2af0d23663b4] Cannot transit to NOT_RUNNING within 10000ms
20:36:18.010 [Thread-6] INFO  o.apache.kafka.streams.KafkaStreams - stream-client [kstreams-6e816551-a066-455d-a619-2af0d23663b4] Streams client cannot stop completely within the timeout
20:36:18.011 [Thread-6] DEBUG o.a.k.s.p.internals.StateDirectory - stream-thread [Thread-6] Acquired state dir lock for task 1_0
20:36:18.012 [Thread-6] INFO  o.a.k.s.p.internals.StateDirectory - stream-thread [Thread-6] Deleting state directory 1_0 for task 1_0 as user calling cleanup.
20:36:18.012 [Thread-6] DEBUG o.a.k.s.p.internals.StateDirectory - stream-thread [Thread-6] Released state dir lock for task 1_0

Что здесь может пойти не так? Заранее спасибо.

...