KafkaStreams создает исключение FileNoFoundException для .checkpoint.tmp - PullRequest
1 голос
/ 27 апреля 2019

Я локально запускаю приложение (Spring Boot и Spring Cloud Stream, Kafka Binder и Kafka Streams Binder), которое использует и создает несколько тем, из которых все имеют 4 раздела. У меня есть некоторая потоковая обработка внутри с использованием KafkaStreams. Все выглядит хорошо после запуска приложения в нескольких случаях, но в тот момент, когда один экземпляр отключается, другие продолжают непрерывно генерировать исключение FileNotFoundException.

При запуске 3 экземпляров, сразу после запуска я вижу:

  • Для первого
    current active tasks: [0_0, 1_0]
    current standby tasks: []
    previous active tasks: [0_0, 1_0, 0_1, 1_1]
  • Для второго
    current active tasks: [0_2, 1_2, 0_3]
    current standby tasks: []
    previous active tasks: [0_2, 1_2, 0_3, 1_3]
  • И для последнего:
    current active tasks: [0_1, 1_1, 1_3]
    current standby tasks: []
    previous active tasks: []

Я публикую некоторые сообщения в темах, которые обрабатываются потоками Кафки, и все работает отлично. Но когда я закрываю первый экземпляр, второй постоянно выбрасывает

java.io.FileNotFoundException: /tmp/kafka-streams/my-service/1_2/.checkpoint.tmp (No such file or directory)
    at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_144]
    at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[na:1.8.0_144]
    at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[na:1.8.0_144]
    at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[na:1.8.0_144]
    at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:315) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:397) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:382) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1042) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) [kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) [kafka-streams-2.0.1.jar:na]

и третий делает то же самое, только каталог один раз:

/ tmp / kafka-streams / my-service / 1_1 /. Checkpoint.tmp

и один раз:

/ TMP / Кафка-потоки / мой-сервис / 1_3 /. Checkpoint.tmp

В tmp / kafka-streams / my-service я вижу, что на самом деле после закрытия первого экземпляра остаются только каталоги 0_0 0_1 0_2 0_3 1_0.

Это не приводит к сбою приложения, и из того, что я вижу, состояние доступно из запущенных экземпляров (но, возможно, я что-то здесь упускаю). Кто-нибудь знает, почему генерируется это исключение, какое влияние оно может оказать и что я должен изменить, чтобы исправить это?

...