Потоки Кафки - определение пользовательского реляционного / Non_Key_Value StateStore с отказоустойчивостью - PullRequest
0 голосов
/ 28 октября 2018

Я пытаюсь реализовать источник событий, используя kafka.

Мое видение для приложения потокового процессора - это типичное 3-слойное приложение Spring, в котором:

  • «презентация»слой заменен (реализован?) API потоков Kafka.
  • Уровень бизнес-логики используется процессором API в топологии.
  • Кроме того, БД является реляционной H2, In-база данных памяти, доступ к которой осуществляется через репозитории Spring Data JPA.В репозиториях также реализованы необходимые интерфейсы, чтобы они могли быть зарегистрированы как хранилища состояний Kafka для использования преимуществ (восстановление и отказоустойчивость)

Но мне интересно, как мне реализовать пользовательскую часть хранилища состояний?

Я искал И:

  • Есть несколько интерфейсов, таких как StateStore & StoreBuilder.StoreBuilder имеет метод withLoggingEnabled();Но если я включу его, когда произойдет фактическое обновление и регистрация изменений?обычно все примеры являются хранилищами значений ключа, даже для пользовательских.Что делать, если я не хочу значения ключа?Пример в разделе интерактивных запросов в документации kafka просто не сокращает его.

  • Мне известны интерактивные запросы.Но они, кажется, хороши для запросов и не обновлений;как следует из названия.

В хранилище значения ключа записи, которые отправляются в журнал изменений, просты.Но если я не использую значение ключа;когда и как мне сообщить кафке, что мое состояние изменилось?

1 Ответ

0 голосов
/ 28 октября 2018

Вам нужно будет внедрить StateStore для действительного хранилища, которое вы хотите использовать.Этот интерфейс ничего не говорит о хранилище, и вы можете делать все, что захотите.

Вам также нужно реализовать StoreBuilder, который действует как фабрика для создания экземпляров вашего пользовательского хранилища.

MyCustomStore implements StateStore {
    // define any interface you want to present to the user of the store
}

MyCustomStoreBuilder implements StoreBuilder<MyCustomStore> {
    MyCustomStore builder() {
        // create new instance of MyCustomStore and return it
    }

    // all other methods (except `name()`) are optional
    // eg, you can do a dummy implementation that only returns `this`
}

Сравните: https://docs.confluent.io/current/streams/developer-guide/processor-api.html#implementing-custom-state-stores

Но если я не использую значение ключа;когда и как мне сообщить kafka, что мое состояние изменилось?

Если вы хотите реализовать withLoggingEnabled() (аналогично кешированию), вам нужно будет реализовать это ведение журнала (или кеширование) как частьваш магазин.Потому что Kafka Streams не знает, как работает ваш магазин, и не может обеспечить реализацию для этого.Таким образом, это ваше дизайнерское решение, если ваш магазин поддерживает вход в раздел изменений или нет.И если вы хотите поддерживать ведение журнала, вам нужно придумать дизайн, который отображает обновления хранилища для пар ключ-значение (вы также можете записать несколько для каждого обновления), который вы можете записать в тему журнала изменений и который позволяет воссоздать состояниепри чтении этих записей в разделе журнала изменений.

Получение отказоустойчивого хранилища возможно не только через ведение журнала изменений.Например, вы также можете подключить удаленное хранилище, которое выполняет внутреннюю репликацию и т. Д. И, таким образом, полагается на отказоустойчивые возможности хранилища вместо использования регистрации изменений.Конечно, использование удаленного хранилища подразумевает другие проблемы по сравнению с использованием локального хранилища.

Для хранилищ Kafka Streams по умолчанию ведение журналов и кэширование реализованы в качестве оберток для реального хранилища, что делает его легко подключаемым.Но вы можете реализовать это любым способом, который подходит вашему магазину лучше всего.Для сравнения вы можете проверить следующие классы для хранилища ключей-значений:

Для интерактивных запросов вы вводите соответствующий QueryableStoreType для интеграции вашего пользовательского магазина.Срhttps://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores Вы правы, что интерактивные запросы являются интерфейсом только для чтения для существующих хранилищ, потому что Processors должен отвечать за поддержание хранилищ.Однако ничто не мешает вам открыть свой пользовательский магазин для записей.Однако это сделает ваше приложение по своей сути недетерминированным, потому что, если вы перемотаете входную тему и повторно обработаете ее, оно может вычислить другой результат, в зависимости от того, какие «записи внешнего хранилища» выполняются.Вы должны рассмотреть возможность любой записи в магазин через входные темы.Но это ваше решение.Если вы разрешаете «внешние записи», вам также необходимо убедиться, что они также регистрируются, если вы хотите реализовать ведение журнала.

...