На основе Flink Kafka Consumer существует конструктор:
public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
Второй параметр - KeyedDeserializationSchema
используется для десериализации записи Кафки. Он включает в себя ключ сообщения, значение сообщения, смещение, тему и т. Д. Таким образом, вы можете реализовать свой собственный тип с именем MyKafkaRecord
как T с ключом Avro и значением Avro в нем. Затем передайте MyKafkaRecord
как T
вашей реализации KeyedDeserializationSchema
. См. TypeInformationKeyValueSerializationSchema
в качестве примера.
например. Чтение дополнительной информации от Кафки:
class KafkaRecord<K, V> {
private K key;
private V value;
private long offset;
private int partition;
private String topic;
...
}
class MySchema<K, V> implements KeyedDeserializationSchema<KafkaRecord<K, V>> {
KafkaRecord<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
KafkaRecord<K, V> rec = new KafkaRecord<>();
rec.key = KEY_DESERIaLISER.deserialize(messageKey);
rec.value = ...;
rec.topic = topic;
...
}
}