В рамках нашей логики приложения мы используем хранилище состояний Kafka Streams для поиска range , данные загружаются из раздела Kafka с использованием метода builder.table()
.
Проблема в том, что ключ исходной темы сериализуется как JSON и плохо подходит для сравнения двоичных ключей, используемых внутри хранилища состояний на основе RocksDB.
Мы надеялись использовать отдельный serde для ключей, передав его в Materialized.as()
.Тем не менее, похоже, что реализация потоков сбрасывает все, что передано в исходные serdes, используемые для загрузки из темы таблицы.
Это то, что я вижу во внутренних компонентах построителя потоков:
public synchronized <K, V> KTable<K, V> table(final String topic,
final Consumed<K, V> cons,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(consumed),
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
}
Кто-нибудь знает, почему это так, и возможно ли использовать другой serde для хранилища состояний DSL?
Пожалуйста, не предлагайте использовать Processor API, этот маршрут хорошо изучен.Я хотел бы избегать записи процессора и пользовательского хранилища состояний каждый раз, когда мне нужно скопировать данные перед их сохранением в хранилище состояний.
После некоторого поиска источников потоков я обнаружил, что могу передать пользовательский Materialized.as
в фильтр с всегда истинным предикатом.Но это пахнет немного хакерски.
Это мой код, который, к сожалению, не работает, как мы надеялись, из-за "сброса serdes", описанного выше.
Serde<Value> valueSerde = new JSONValueSerde()
KTable<Key, Value> table = builder.table(
tableTopic,
Consumed.with(new JSONKeySerde(), valueSerde)
Materialized.as(cacheStoreName)
.withKeySerde(new BinaryComparisonsCompatibleKeySerde())
.withValueSerde(valueSerde)
)