В чем разница между кодером и десериализатором Kafka в контексте использования Kafka и Apache Beam? - PullRequest
0 голосов
/ 16 октября 2019

Я новичок в Apache Beam. Я пытаюсь прочитать данные из Кафки, используя KafKaIO согласно документации. При создании PCollection метод withValueDeserializerAndCoder позволяет установить кодер и десериализатор. Я не мог понять, почему нам может потребоваться как десериализатор, так и кодер. На мой взгляд, оба представляют представление байтовых потоков в виде объектов Java. Так зачем нам оба? Это потому, что Beam - это скорее фреймворк, который позволяет нескольким бегунам внизу?

1 Ответ

2 голосов
/ 16 октября 2019

Да, это немного сложно и не очевидно с первого взгляда. Вам нужно иметь Kafka Deserializer (или Serializer, если вы пишете в Kafka), чтобы интерпретировать байты ключа и значения для объектов Java, которые вы читаете из Kafka. В то же время, Beam требует от нас предоставить Coder s для материализации промежуточных данных наших PCollection s во время выполнения.

Кодеры не относятся к (де) сериализаторам (которые отвечают за интерпретацию сообщений Кафки), поэтому нам необходимо явно предоставить кодеры. Хотя KafkaIO попытается вывести кодер из десериализатора, и во многих случаях он будет работать неявно, но если это не удастся или вы захотите указать конкретный кодер, вы можете указать его отдельно.

Например,если ваши сообщения Kafka сериализуются в формате Avro, вы можете использовать KafkaAvroDeserializer и внутренний луч AvroCoder.

public static void main(String[] args) {
...
  KafkaIO.Read read = KafkaIO.<Long, MyClass>read()
      .withKeyDeserializer(LongDeserializer.class)
      .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
          AvroCoder.of(MyClass.class));
...
}

@DefaultCoder(AvroCoder.class) 
public class MyClass {

  String name;
  String age;

  MyClass() {}

  MyClass(String n, String a) {
    this.name = n;
    this.age = a;
  }
}
...