Flink Kafka - Данные пользовательских классов всегда равны нулю - PullRequest
0 голосов
/ 02 июля 2018

Пользовательский класс

лицо

class Person
{
  private Integer id;
  private String name; 
 //getters and setters
}

Соединитель Kafka Flink

TypeInformation<Person> info = TypeInformation.of(Person.class);
TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(info, new ExecutionConfig());
DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", schema , getKafkaProperties()));

Теперь, если я отправлю ниже JSON

{ "id" : 1, "name": Synd }

через Kafka Console Producer, код flink генерирует исключение нулевого указателя Но если я использую SimpleStringSchema вместо CustomSchema, как определено ранее, поток печатается.

Что не так в вышеуказанной настройке

Ответы [ 2 ]

0 голосов
/ 03 июля 2018

Ответьте, у кого такой же вопрос

Пользовательский сериализатор

class PersonSchema implements DeserializationSchema<Person>{

    private ObjectMapper mapper = new ObjectMapper(); //com.fasterxml.jackson.databind.ObjectMapper;

    @Override
    public Person deserialize(byte[] bytes) throws IOException {
        return mapper.readValue( bytes, Person.class );
    }

    @Override
    public boolean isEndOfStream(Person person) {
        return false;
    }

    @Override
    public TypeInformation<Person> getProducedType() {
        return TypeInformation.of(new TypeHint<Person>(){});
    }
}

Использование схемы

DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", new PersonSchema() , getKafkaProperties()));
0 голосов
/ 03 июля 2018

TypeInformationSerializationSchema - это схема де / сериализации, которая использует стек сериализации Flink и, следовательно, также его сериализатор. Поэтому при использовании этого SerializationSchema Flink ожидает, что данные были сериализованы с помощью сериализатора Flink для типа Person.

Учитывая отрывок из класса Person, Flink, скорее всего, будет использовать его PojoTypeSerializer. Подача входных данных JSON не будет понята этим сериализатором.

Если вы хотите использовать JSON в качестве формата ввода, вам нужно определить свой собственный DeserializationSchema, который может анализировать JSON в Person.

...