Невозможно прочитать данные kafka-stream из kafka-потока debezium-postgres - PullRequest
0 голосов
/ 11 октября 2018

Я запустил коннектор kafka, используя следующую команду:

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-postgres/connect-postgres.properties 

Порядок сериализации в connect-avro-standalone.properties:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

Я создал бэкэнд Java, которыйслушайте эту тему потока kafka и сможете получать данные из postgres при каждом добавлении / обновлении / удалении.
Но данные поступают в каком-то неизвестном формате кодировки, и поэтому я не могу правильно их прочитать.
Вот соответствующий фрагмент кода:

properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());

StreamsBuilder streamsBuilder = new StreamsBuilder();

final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteSerde = Serdes.ByteArray();

streamsBuilder.stream(Pattern.compile(getTopic()), Consumed.with(stringSerde, byteSerde))
.mapValues(data -> {
  System.out.println("->"+new String(data));
  return data;
});

Я запутался в том, где и что мне нужно изменить;в опоре разъема avro или в коде стороны java

1 Ответ

0 голосов
/ 11 октября 2018

Ваша конфигурация Kafka Connect в данном случае означает, что сообщения по теме Kafka будут сериализованы в Avro:

value.converter=io.confluent.connect.avro.AvroConverter

Это означает, что вам необходимо десериализовать с помощью Avro в приложении Streams.Подробнее смотрите здесь: https://docs.confluent.io/current/streams/developer-guide/datatypes.html#avro

...