Я работаю с кафкой и апачем флинк. Я пытаюсь использовать записи (которые в формате avro) из темы Кафка в Apache Flink. Ниже приведен фрагмент кода, который я пытаюсь использовать.
Использование пользовательского десериализатора для десериализации авро-записей из темы.
Схема Avro для данных, которые я отправляю в тему "test-topic", указана ниже.
{
"namespace": "com.example.flink.avro",
"type": "record",
"name": "UserInfo",
"fields": [
{"name": "name", "type": "string"}
]
}
Используемый мной десериализатор приведен ниже.
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
private static final long serialVersionUID = 1L;
private final Class<T> avroType;
private transient DatumReader<T> reader;
private transient BinaryDecoder decoder;
public AvroDeserializationSchema(Class<T> avroType) {
this.avroType = avroType;
}
public T deserialize(byte[] message) {
ensureInitialized();
try {
decoder = DecoderFactory.get().binaryDecoder(message, decoder);
T t = reader.read(null, decoder);
return t;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
private void ensureInitialized() {
if (reader == null) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
reader = new SpecificDatumReader<T>(avroType);
} else {
reader = new ReflectDatumReader<T>(avroType);
}
}
}
public boolean isEndOfStream(T nextElement) {
return false;
}
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avroType);
}
}
И так написано мое приложение Flink.
public class FlinkKafkaApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers", "localhost:9092");
kafkaProperties.put("group.id", "test");
AvroDeserializationSchema<UserInfo> schema = new AvroDeserializationSchema<UserInfo>(UserInfo.class);
FlinkKafkaConsumer011<UserInfo> consumer = new FlinkKafkaConsumer011<UserInfo>("test-topic", schema, kafkaProperties);
DataStreamSource<UserInfo> userStream = env.addSource(consumer);
userStream.map(new MapFunction<UserInfo, UserInfo>() {
@Override
public UserInfo map(UserInfo userInfo) {
return userInfo;
}
}).print();
env.execute("Test Kafka");
}
Я пытаюсь распечатать запись, отправленную в тему, как показано ниже.
{"name": "sumit"}
Выход:
Я получаю вывод
{ "Имя": ""}
Может ли кто-нибудь помочь выяснить, в чем здесь проблема и почему я не получаю {"name": "sumit"} в качестве вывода.