Я реализую проект потока kafka с использованием Processor API и Kafka StreamDSL. Моя функция процесса в процессоре
@Override
public void process(final String key, final T event) {
keyValueStore.put(key, event);
}
Моя топология:
protected Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(stateStoreName);
StoreBuilder<KeyValueStore<String, T>> storeBuilder =
Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.serdeFrom(
new EventSerializer(streamProperties()),
new EventDeserializer(streamProperties())));
builder.addStateStore(storeBuilder);
final KStream<String, T> stream = builder.stream(inputTopic);
stream.process(() -> new Processor<>(stateStoreName), stateStoreName);
stream.to(outputTopic);
return builder.build();
}
И, наконец, это мой собственный класс EventSerializer:
public class EventSerializer<T extends SpecificRecordBase & SpecificRecord>
implements Serializer<T> {
private final KafkaAvroSerializer inner;
public EventSerializer(Map<String, ?> properties) {
inner = new KafkaAvroSerializer();
configure(properties, false);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(EventSerdeConfig.withProducerConfig(configs), isKey);
}
@Override
public byte[] serialize(final String topic, final T record) {
return inner.serialize(topic, record);
}
}
Когда процессор помещает событие в keyValueStore, я получил ошибку io.confluent.rest.exceptions.RestNotFoundException: Subject not found. После некоторой отладки я понял, что это из-за сериализатора имеет проблемы при сериализации событий. Topi c в функции public byte[] serialize(final String topic, final T record)
равен application id-store-changelog
. Это внутреннее поведение кафки, хотя я не знаю почему. Serialzer не может найти схему для этого комбинированного topi c, что вызывает ошибку. Нужно ли мне регистрировать схему для этого комбинированного topi c или есть ли способ передать реальный клиент topi c в сериализатор, который уже зарегистрировал схему?