Нужно ли мне регистрировать схему для журнала изменений потока kafka topi c в реестре схем? - PullRequest
0 голосов
/ 05 августа 2020

Я реализую проект потока kafka с использованием Processor API и Kafka StreamDSL. Моя функция процесса в процессоре

@Override
public void process(final String key, final T event) {
  keyValueStore.put(key, event);
}

Моя топология:

 protected Topology buildTopology() {
    final StreamsBuilder builder = new StreamsBuilder();

    KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(stateStoreName);
    StoreBuilder<KeyValueStore<String, T>> storeBuilder =
        Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.serdeFrom(
            new EventSerializer(streamProperties()),
            new EventDeserializer(streamProperties())));
    builder.addStateStore(storeBuilder);

    final KStream<String, T> stream = builder.stream(inputTopic);
    stream.process(() -> new Processor<>(stateStoreName), stateStoreName);
    stream.to(outputTopic);

    return builder.build();
  }

И, наконец, это мой собственный класс EventSerializer:

public class EventSerializer<T extends SpecificRecordBase & SpecificRecord>
    implements Serializer<T> {
  private final KafkaAvroSerializer inner;

  public EventSerializer(Map<String, ?> properties) {
    inner = new KafkaAvroSerializer();
    configure(properties, false);
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    inner.configure(EventSerdeConfig.withProducerConfig(configs), isKey);
  }

  @Override
  public byte[] serialize(final String topic, final T record) {
    return inner.serialize(topic, record);
  }
}

Когда процессор помещает событие в keyValueStore, я получил ошибку io.confluent.rest.exceptions.RestNotFoundException: Subject not found. После некоторой отладки я понял, что это из-за сериализатора имеет проблемы при сериализации событий. Topi c в функции public byte[] serialize(final String topic, final T record) равен application id-store-changelog. Это внутреннее поведение кафки, хотя я не знаю почему. Serialzer не может найти схему для этого комбинированного topi c, что вызывает ошибку. Нужно ли мне регистрировать схему для этого комбинированного topi c или есть ли способ передать реальный клиент topi c в сериализатор, который уже зарегистрировал схему?

1 Ответ

1 голос
/ 07 августа 2020

Когда у вас есть new KafkaAvroSerializer();, по умолчанию он указывает на localhost:8081 для реестра схемы.

Вам не нужно регистрироваться (хотя вы можете), поскольку производитель делает это как часть logi сериализации c с inner.serialize

Примечание: расширение KafkaAvroSerializer может иметь больше смысла

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...