Я пытаюсь десериализовать сообщение Avro от kafka на apache flink
В настоящее время я делаю это, внедряя интерфейс DeserializationSchema, но оно определено, есть ли лучшая форма для достижения этой цели?
Спасибо.
Это мои уроки
public final class FlinkKafkaExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// setup Kafka sink
ConfluentAvroDeserializationSchema deserSchema = new
ConfluentAvroDeserializationSchema("http://localhost:8081", 1000);
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "0.0.0.0:9092");
kafkaProps.setProperty("zookeeper.connect", "0.0.0.0:2181");
kafkaProps.setProperty("group.id", "org.apache.flink");
FlinkKafkaConsumer08<String> flinkKafkaConsumer =
new FlinkKafkaConsumer08<String>("conekta.public.codes", deserSchema, kafkaProps);
DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
kafkaStream.print();
env.execute("Flink Kafka Java Example");
}
}
public class ConfluentAvroDeserializationSchema implements DeserializationSchema<String> {
private final String schemaRegistryUrl;
private final int identityMapCapacity;
private KafkaAvroDecoder kafkaAvroDecoder;
public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int identityMapCapacity) {
this.schemaRegistryUrl = schemaRegistryUrl;
this.identityMapCapacity = identityMapCapacity;
}
public String deserialize(byte[] message) {
if (kafkaAvroDecoder == null) {
SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
}
return this.kafkaAvroDecoder.fromBytes(message).toString();
}
public boolean isEndOfStream(String nextElement) {
return false;
}
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}