Да, это немного сложно и не очевидно с первого взгляда. Вам нужно иметь Kafka Deserializer
(или Serializer
, если вы пишете в Kafka), чтобы интерпретировать байты ключа и значения для объектов Java, которые вы читаете из Kafka. В то же время, Beam требует от нас предоставить Coder
s для материализации промежуточных данных наших PCollection
s во время выполнения.
Кодеры не относятся к (де) сериализаторам (которые отвечают за интерпретацию сообщений Кафки), поэтому нам необходимо явно предоставить кодеры. Хотя KafkaIO
попытается вывести кодер из десериализатора, и во многих случаях он будет работать неявно, но если это не удастся или вы захотите указать конкретный кодер, вы можете указать его отдельно.
Например,если ваши сообщения Kafka сериализуются в формате Avro, вы можете использовать KafkaAvroDeserializer
и внутренний луч AvroCoder
.
public static void main(String[] args) {
...
KafkaIO.Read read = KafkaIO.<Long, MyClass>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(MyClass.class));
...
}
@DefaultCoder(AvroCoder.class)
public class MyClass {
String name;
String age;
MyClass() {}
MyClass(String n, String a) {
this.name = n;
this.age = a;
}
}