Как десериализовать сообщения Kafka AVRO с использованием Apache Beam - PullRequest
0 голосов
/ 19 февраля 2019

Основная цель - объединить две темы Кафки, одну - сжатые медленно движущиеся данные, а другую - быстро движущиеся данные, которые принимаются каждую секунду.

Мне удалось использовать сообщения в простых сценариях, таких как KV (Long, String), используя что-то вроде:

PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long, 
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)

PCollection<String> output = input.apply(Values.<String>create());

Но, похоже, вы не подходите, когда вынужно десериализовать от AVRO.У меня есть KV (STRING, AVRO), который мне нужно потреблять.

Я попытался сгенерировать Java-классы из схемы AVRO, а затем включить их в «apply», например:

PCollection<MyClass> output = input.apply(Values.<MyClass>create());

Но это не был правильный подход.

Есть ли какая-либо документация / примеры, на которые кто-нибудь мог бы указать мне, чтобы я мог понять, как вы будете работать с Kafka AVRO и Beam.Любая помощь приветствуется.

Я обновил свой код:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;

public class Main {

public static void main(String[] args) {

    PipelineOptions options = PipelineOptionsFactory.create();

    Pipeline p = Pipeline.create(options);

    PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
    );

    p.run();

}
}
#######################################################
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;

Myclass(){}
Myclass(String n, String a) {
    this.name= n;
    this.age= a;
}
}

Но теперь я получаю следующие ошибки несовместимых типов: java.lang.Class не может быть преобразованв java.lang.Class <?extends org.apache.kafka.common.serialization.Deserializer <java.lang.String>>

Я должен импортировать неправильные сериализаторы?

Ответы [ 4 ]

0 голосов
/ 30 апреля 2019

Хороший ответ Йохея, но я также обнаружил, что это работает

import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;

...

public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass> {}

...
.withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))
...

, где MyCustomClass - это код, созданный с помощью инструментов Avro.

0 голосов
/ 24 апреля 2019

Измените KafkaIO.<Long, String>read() на KafkaIO.<Long, Object>read().

Если вы посмотрите на реализацию KafkaAvroDeserializer, он реализует десериализатор:

public class KafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object>

0 голосов
/ 30 апреля 2019

Я столкнулся с той же проблемой.Нашел решение в этом почтовом архиве.http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E*10035100 в вашем случаевам нужно определить свой собственный KafkaAvroDeserializer , например, следующим образом.

public class MyClassKafkaAvroDeserializer extends
  AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
      configure(new KafkaAvroDeserializerConfig(configs));
  }

  @Override
  public MyClass deserialize(String s, byte[] bytes) {
      return (MyClass) this.deserialize(bytes);
  }

  @Override
  public void close() {} }

Затем укажите KafkaAvroDeserializer в качестве ValueDeserializer.

p.apply(KafkaIO.<Long, MyClass>read()
 .withKeyDeserializer(LongDeserializer.class)
 .withValueDeserializer(MyClassKafkaAvroDeserializer.class) );
0 голосов
/ 19 февраля 2019

Вы можете использовать KafkaAvroDeserializer следующим образом:

PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
  .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))

Где MyClass - это схема Avro, созданная классом POJO.

Убедитесь, что ваш класс POJO имеет аннотацию AvroCoder какв приведенном ниже примере:

@DefaultCoder(AvroCoder.class)
   public class MyClass{
      String name;
      String age;

      MyClass(){}
      MyClass(String n, String a) {
         this.name= n;
         this.age= a;
      }
  }
...