Я недавно обновился с kafka 0.10.1 до 1.1.0.
Мое потоковое приложение создает потоки, подписываясь на изменения из нашей базы данных, используя confluent connect, выполняет некоторые вычисления и затем публикует свой собственный поток / тему.
При запуске приложения я пытаюсь получитькаждый из потокового хранилища приложение публикует.Этот код просто пытается получить хранилище с помощью метода KafkaStreams.store в цикле try / catch (я пытаюсь 300 раз дать время потока в случае, если он перебалансируется или действительно мигрирует).Все это работало нормально для kafka 0.10.2
После обновления до kafka 1.1.0 приложение запускается в первый раз нормально.Однако, если я пытаюсь перезапустить приложение, в тех случаях, когда поток использует несколько тем из соединения, такие потоки всегда выдают InvalidStateStoreException.Это не происходит для потоков, которые подписываются на одну тему подключения.Чтобы исправить, я должен удалить журналы и сохранить, а затем перезапустить мое потоковое приложение, оно работает нормально.Но мне всегда приходится в значительной степени стирать журналы и сохранять каждый раз, когда я перезагружаюсь.
Я немного отладил в исходном коде и обнаружил, что проблема заключается в этом вызове в org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
}
if (!streamThread.isRunningAndNotRebalancing()) {
throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
}
final List<T> stores = new ArrayList<>();
for (Task streamTask : streamThread.tasks().values()) {
final StateStore store = streamTask.getStore(storeName);
if (store != null && queryableStoreType.accepts(store)) {
if (!store.isOpen()) {
throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
}
stores.add((T) store);
}
}
return stores;
}
Для потоков, которые используют несколько тем подключения и создают один поток / тему, при перезапуске приложения приведенный выше код не находит хранилище для темы, которую предполагается опубликовать (дажехотя он должен существовать, если приложение запускается и работает нормально при первом запуске после очистки журналов и сохранения (сейчас я вручную удаляю эти папки)).Однако, что еще более странно, это то, что, несмотря на то, что он не находит магазин, он все еще получает темы, связанные с подключением, и производит вычисленный поток, по-видимому, просто отлично.обновить