ошибка kafka после обновления до 1.1.0: «хранилище состояний ... возможно, перенесено в другой экземпляр» - PullRequest
0 голосов
/ 26 апреля 2018

Я недавно обновился с kafka 0.10.1 до 1.1.0.

Мое потоковое приложение создает потоки, подписываясь на изменения из нашей базы данных, используя confluent connect, выполняет некоторые вычисления и затем публикует свой собственный поток / тему.

При запуске приложения я пытаюсь получитькаждый из потокового хранилища приложение публикует.Этот код просто пытается получить хранилище с помощью метода KafkaStreams.store в цикле try / catch (я пытаюсь 300 раз дать время потока в случае, если он перебалансируется или действительно мигрирует).Все это работало нормально для kafka 0.10.2

После обновления до kafka 1.1.0 приложение запускается в первый раз нормально.Однако, если я пытаюсь перезапустить приложение, в тех случаях, когда поток использует несколько тем из соединения, такие потоки всегда выдают InvalidStateStoreException.Это не происходит для потоков, которые подписываются на одну тему подключения.Чтобы исправить, я должен удалить журналы и сохранить, а затем перезапустить мое потоковое приложение, оно работает нормально.Но мне всегда приходится в значительной степени стирать журналы и сохранять каждый раз, когда я перезагружаюсь.

Я немного отладил в исходном коде и обнаружил, что проблема заключается в этом вызове в org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider

    public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    if (streamThread.state() == StreamThread.State.DEAD) {
        return Collections.emptyList();
    }
    if (!streamThread.isRunningAndNotRebalancing()) {
        throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
    }
    final List<T> stores = new ArrayList<>();
    for (Task streamTask : streamThread.tasks().values()) {
        final StateStore store = streamTask.getStore(storeName);
        if (store != null && queryableStoreType.accepts(store)) {
            if (!store.isOpen()) {
                throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
            }
            stores.add((T) store);
        }
    }
    return stores;
}

Для потоков, которые используют несколько тем подключения и создают один поток / тему, при перезапуске приложения приведенный выше код не находит хранилище для темы, которую предполагается опубликовать (дажехотя он должен существовать, если приложение запускается и работает нормально при первом запуске после очистки журналов и сохранения (сейчас я вручную удаляю эти папки)).Однако, что еще более странно, это то, что, несмотря на то, что он не находит магазин, он все еще получает темы, связанные с подключением, и производит вычисленный поток, по-видимому, просто отлично.обновить

...