Вы можете сделать пару вещей:
Вы можете определить RecordTranslator
. Этот интерфейс позволяет вам определить, как носик будет создавать кортеж, основываясь на ConsumerRecord
, который он прочитал из Kafka.
Реализация по умолчанию выглядит следующим образом:
public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
@Override
public List<Object> apply(ConsumerRecord<K, V> record) {
return new Values(record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
}
@Override
public Fields getFieldsFor(String stream) {
return FIELDS;
}
Как видите, вы получите ConsumerRecord
, который является типом, встроенным в базовую клиентскую библиотеку Kafka, а затем должны превратить его в List<Object>
, который будет значениями кортежа. Если вы хотите сделать что-то сложное с записью перед отправкой данных, это будет то, как вы это сделаете. Например, если вы хотите вставить ключ, значение и смещение в структуру данных, которую он затем отправил, вы можете сделать это здесь. Вы используете переводчик как KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build()
Лучшая альтернатива, если вы хотите десериализовать ключ / значение только в один из ваших собственных классов данных, это реализовать Deserializer
. Это позволит вам определить, как десериализовать ключ / значение, которое вы получаете от Kafka. Если вы хотите десериализовать, например, значение в свой собственный класс данных, вы можете сделать это с помощью этого интерфейса.
По умолчанию StringDeserializer
делает это:
@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
После того, как вы создали ваш собственный Deserializer
, вы используете его, делая что-то вроде KafkaSpoutConfig.builder(bootstrapServers, "test").setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, YourDeserializer.class).build()
. Для настройки десериализатора значения существует аналогичное свойство потребителя.