Основная цель - объединить две темы Кафки, одну - сжатые медленно движущиеся данные, а другую - быстро движущиеся данные, которые принимаются каждую секунду.
Мне удалось использовать сообщения в простых сценариях, таких как KV (Long, String), используя что-то вроде:
PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long,
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
PCollection<String> output = input.apply(Values.<String>create());
Но, похоже, вы не подходите, когда вынужно десериализовать от AVRO.У меня есть KV (STRING, AVRO), который мне нужно потреблять.
Я попытался сгенерировать Java-классы из схемы AVRO, а затем включить их в «apply», например:
PCollection<MyClass> output = input.apply(Values.<MyClass>create());
Но это не был правильный подход.
Есть ли какая-либо документация / примеры, на которые кто-нибудь мог бы указать мне, чтобы я мог понять, как вы будете работать с Kafka AVRO и Beam.Любая помощь приветствуется.
Я обновил свой код:
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;
public class Main {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
);
p.run();
}
}
#######################################################
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;
Myclass(){}
Myclass(String n, String a) {
this.name= n;
this.age= a;
}
}
Но теперь я получаю следующие ошибки несовместимых типов: java.lang.Class не может быть преобразованв java.lang.Class <?extends org.apache.kafka.common.serialization.Deserializer <java.lang.String>>
Я должен импортировать неправильные сериализаторы?