Сериализация с помощью операции KStream groupBy - PullRequest
0 голосов
/ 10 февраля 2019

Я пытаюсь выполнить операцию подсчета на KStream и сталкиваюсь с некоторыми трудностями в понимании того, как здесь работает сериализация.У меня есть поток, который толкает людей информацию, например, имя, возраст.После использования этого потока я пытаюсь создать таблицу KTable с подсчетом возраста людей.

Ввод: {"name": "abc", "age": "15"}

Выход: 30, 10 20, 4 10, 8 35, 22 ...

Свойства

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "person_processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

Процессор

KStream<Object, Person> people = builder.stream("people");
people.print(Printed.<Object, Person>toSysOut().withLabel("consumer-1"));

Выход [потребитель-1]: ноль, [B @ 7e37bab6

Вопрос-1 Я так понимаю, что данные в теме представлены в байтах.Я не устанавливаю никаких Serdes для ключа или значения, чтобы начать с.Преобразует ли KStream входные данные из байтов в Person и печатает здесь адрес Person?

Question-2 Когда я добавляю значение Serdes, указанное ниже, я получаю более значимый вывод.Байтная информация здесь преобразуется в String, а затем в Person?Почему значение теперь печатается правильно?

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

[consumer-1]: null, {"name" : "abc","age" : "15"}

Вопрос-3 Теперь при выполнении подсчета возраста я получаю ошибку времени выполнения при преобразовании строки в персону.Если groupBy устанавливает возраст в качестве Ключа, а счет - как Длинный, почему происходит преобразование Строка в Человека?

KTable<Integer, Long> integerLongKTable = people.groupBy((key, value) -> value.getAge())
    .count();

Exception in thread "person_processor-9ff96b38-4beb-4594-b2fe-ae191bf6b9ff-StreamThread-1" java.lang.ClassCastException: java.lang.String cannot be cast to com.example.kafkastreams.KafkaStreamsApplication$Person
at org.apache.kafka.streams.kstream.internals.KStreamImpl$1.apply(KStreamImpl.java:152)
at org.apache.kafka.streams.kstream.internals.KStreamImpl$1.apply(KStreamImpl.java:149)

Правка-1

После прочтениячерез ответ @Matthias J. Sax я создал PersonSerde с использованием Serializer и DeSerializer из этого местоположения, я получаю это исключение SerializationException ...

https://github.com/apache/kafka/tree/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview

static class Person {

    String name;
    String age;

    public Person(String name, String age) {

      this.name = name;
      this.age = age;
    }

    void setName(String name) {

      this.name = name;
    }

    String getName() {

      return name;
    }

    void setAge(String age) {

      this.age = age;
    }

    String getAge() {

      return age;
    }

    @Override
    public String toString() {

      return "Person {name:" + this.getName() + ",age:" + this.getAge() + "}";
    }
  }

public class PersonSerde implements Serde {

  @Override
  public void configure(Map map, boolean b) {

  }

  @Override
  public void close() {

  }

  @Override
  public Serializer serializer() {

    Map<String, Object> serdeProps = new HashMap<>();

    final Serializer<Person> personSerializer = new JsonPOJOSerializer<>();
    serdeProps.put("JsonPOJOClass", Person.class);
    personSerializer.configure(serdeProps, false);

    return personSerializer;
  }

  @Override
  public Deserializer deserializer() {

    Map<String, Object> serdeProps = new HashMap<>();

    final Deserializer<Person> personDeserializer = new JsonPOJODeserializer<>();
    serdeProps.put("JsonPOJOClass", Person.class);
    personDeserializer.configure(serdeProps, false);

    return personDeserializer;
  }
}

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, personSerde.getClass());

KTable<String, Long> count = people.selectKey((key, value) -> value.getAge()).groupByKey(Serialized.with(Serdes.String(), personSerde))
      .count();

Ошибка

Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing JSON message
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class com.example.kafkastreams.KafkaStreamsApplication$Person and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1191)
at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:313)

Редактировать 5

Таким образом, получается, что когда я сопоставляю значения со строкой, счетчик работает правильно.Но когда я использую его на пользовательском объекте, он завершается с ошибкой

KStream<String, Person> people = builder.stream("person-topic", Consumed.with(Serdes.String(), personSerde));
people.print(Printed.<String, Person>toSysOut().withLabel("person-source"));

KStream<String, Person> agePersonKStream = people.selectKey((key, value) -> value.getAge());
agePersonKStream.print(Printed.<String, Person>toSysOut().withLabel("age-person"));

KStream<String, String> stringStringKStream = agePersonKStream.mapValues((person -> person.name));
stringStringKStream.print(Printed.<String, String>toSysOut().withLabel("age-name"));

KTable<String, Long> stringLongKTable = stringStringKStream.groupByKey(Serialized.with(Serdes.String(), Serdes.String())).count();
stringLongKTable.toStream().print(Printed.<String, Long>toSysOut().withLabel("age-count"));

Без 3-го шага для сопоставления значения с именем, шаг 4 завершается неудачей.

1 Ответ

0 голосов
/ 10 февраля 2019

Вопрос-1 Я понимаю, что данные в теме представлены в байтах.Я не устанавливаю никаких Serdes для ключа или значения, чтобы начать с.Преобразует ли KStream входные данные из байтов в Person и печатает здесь адрес Person?

Если вы не укажете Serde в StreamsConfig или в builder.stream(..., Consumers.with(/*serdes*/)), байты не будутбыть преобразованным в Person объект, но объект будет иметь тип byte[].Таким образом, print() вызовет byte[].toString(), что приведет к зашифрованному выводу ([B@7e37bab6), который вы видите.

Вопрос-2 Когда я добавлю приведенное ниже значение Serdes, я получу более значимый вывод,Байтная информация здесь преобразуется в String, а затем в Person?Почему значение теперь печатается правильно?

Когда вы указываете Serde.String() в StreamsConfig, байты преобразуются в тип String.Кажется, что StringSerde способен десериализовать байты осмысленным образом - но это совпадение, что он работает вообще.Кажется, что ваши данные на самом деле сериализуются в JSON, что объясняет, почему StringSerde() может преобразовать байты в String.

Вопрос-3 Теперь, когда выполняется подсчет возраста,Я получаю ошибку во время выполнения при преобразовании строки в Person.Если groupBy устанавливает возраст в качестве Ключа, а счетчик - в качестве Длинного, почему происходит преобразование Строка в Человека?

Это ожидается.Поскольку байты преобразуются в объект String (как вы указали Serdes.String()), преобразование не может быть выполнено.

Заключительные замечания:

Вы неполучить исключение приведения класса, если вы используете только print(), потому что в этом случае операция приведения не выполняется.Java только вставляет операцию приведения, если требуется.

Для groupBy() вы используете value.getAge(), и, следовательно, Java вставляет приведение сюда (она знает, что ожидаемый тип - Person, потому что он указан с помощью KStream<Object, Person> people = ...Для print() вызывается только toString(), который является определением для Object, и, следовательно, приведение не требуется.

Обобщение в Java a указывает на тип компилятора и заменяется на Object (или приведено, еслитребуется во время компиляции). Таким образом, для print() переменная Object может без проблем указывать на byte[] и toString() вызывается успешно. Для случая groupBy() компилятор приводит Object к Personбыть в состоянии вызвать getAge() - однако, это терпит неудачу, потому что на самом деле тип является String.

Чтобы ваш код работал, вам нужно создать класс PersonSerde extend Serde<Person> и указать его как значение serde.

...