Разное serde для государственного магазина Kafka Streams KTable - PullRequest
0 голосов
/ 11 мая 2018

В рамках нашей логики приложения мы используем хранилище состояний Kafka Streams для поиска range , данные загружаются из раздела Kafka с использованием метода builder.table().

Проблема в том, что ключ исходной темы сериализуется как JSON и плохо подходит для сравнения двоичных ключей, используемых внутри хранилища состояний на основе RocksDB.

Мы надеялись использовать отдельный serde для ключей, передав его в Materialized.as().Тем не менее, похоже, что реализация потоков сбрасывает все, что передано в исходные serdes, используемые для загрузки из темы таблицы.

Это то, что я вижу во внутренних компонентах построителя потоков:

public synchronized <K, V> KTable<K, V> table(final String topic,
                                              final Consumed<K, V> cons,
                                              final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
    Objects.requireNonNull(topic, "topic can't be null");
    Objects.requireNonNull(consumed, "consumed can't be null");
    Objects.requireNonNull(materialized, "materialized can't be null");
    materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
    return internalStreamsBuilder.table(topic,
                                        new ConsumedInternal<>(consumed),
                                        new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
}

Кто-нибудь знает, почему это так, и возможно ли использовать другой serde для хранилища состояний DSL?

Пожалуйста, не предлагайте использовать Processor API, этот маршрут хорошо изучен.Я хотел бы избегать записи процессора и пользовательского хранилища состояний каждый раз, когда мне нужно скопировать данные перед их сохранением в хранилище состояний.

После некоторого поиска источников потоков я обнаружил, что могу передать пользовательский Materialized.as в фильтр с всегда истинным предикатом.Но это пахнет немного хакерски.

Это мой код, который, к сожалению, не работает, как мы надеялись, из-за "сброса serdes", описанного выше.

Serde<Value> valueSerde = new JSONValueSerde()
KTable<Key, Value> table = builder.table(
    tableTopic,
    Consumed.with(new JSONKeySerde(), valueSerde)
    Materialized.as(cacheStoreName)
        .withKeySerde(new BinaryComparisonsCompatibleKeySerde())
        .withValueSerde(valueSerde)
)

1 Ответ

0 голосов
/ 11 мая 2018

Код работает по дизайну. С точки зрения потоков, нет никакой причины использовать другой Serde для хранилища для чтения данных из темы, потому что это, как известно, те же самые данные. Таким образом, если вы не используете Serdes по умолчанию из StreamsConfig, достаточно указать Serde один раз (в Consumed), и нет необходимости указывать его снова в Materialized.

Для вашего особого случая вы можете прочитать тему в виде потока и выполнить «фиктивную агрегацию», которая просто возвращает последнее значение для каждой записи (вместо вычисления фактического агрегата). Это позволяет вам указать другой Serde для типа результата.

...