Как использовать FlinkKafkaConsumer для разбора ключа отдельновместо <T> - PullRequest
0 голосов
/ 15 ноября 2018

Из того, что я могу сказать, с помощью десериализации Flink AVRO вы можете создать поток Avro-объектов, и это нормально, но, похоже, существует проблема, когда потребитель Flink kafka создает только потоки одного объекта: FlinkKafkaConsumerBase<T> asв отличие от используемого по умолчанию Kafka API с его KafkaConsumer.

В моем случае и ключ, и значение являются отдельными объектами, совместимыми с AVRO-схемами, и объединение их схем может быть кошмаром ...

Кроме того,кажется, что с Flink API я не могу получить информацию ConsumerRecord? ...

1 Ответ

0 голосов
/ 16 ноября 2018

На основе Flink Kafka Consumer существует конструктор:

public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
    this(Collections.singletonList(topic), deserializer, props);
}

Второй параметр - KeyedDeserializationSchema используется для десериализации записи Кафки. Он включает в себя ключ сообщения, значение сообщения, смещение, тему и т. Д. Таким образом, вы можете реализовать свой собственный тип с именем MyKafkaRecord как T с ключом Avro и значением Avro в нем. Затем передайте MyKafkaRecord как T вашей реализации KeyedDeserializationSchema. См. TypeInformationKeyValueSerializationSchema в качестве примера.

например. Чтение дополнительной информации от Кафки:

class KafkaRecord<K, V> {
  private K key;
  private V value;
  private long offset;
  private int partition;
  private String topic;

  ...
}

class MySchema<K, V> implements KeyedDeserializationSchema<KafkaRecord<K, V>> {
  KafkaRecord<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
    KafkaRecord<K, V> rec = new KafkaRecord<>();
    rec.key = KEY_DESERIaLISER.deserialize(messageKey);
    rec.value = ...;
    rec.topic = topic;
    ...
  }
}
...