Я использую spark со структурированным потоком для чтения данных схемы avro из kafka topi c in JAVA. Но при десериализации данных avro возникает проблема.
Я использую следующие артефакты:
spark-core_2.12: 2.4.5 spark-sql_2.12: 2.4.5 spark- sql -kafka-0-10_2.12: 2.4.5 kafka-avro-serializer: 5.3.0 spark-streaming_2.12 : 2.4.5 org. apache .spark: spark-avro: 2.4.5
Ниже приведен код для чтения avro (десериализация):
...
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("key.deserializer", KafkaAvroDeserializer.class.getName())
.option("value.deserializer", KafkaAvroDeserializer.class.getName())
.option("schema.registry.url", "http://127.0.0.1:8081")
.option("specific.avro.reader", "true")
.option("subscribe", "epeTopicdt")
.load()
.selectExpr("CAST(value AS STRING) as JSON");
...
, даже если я использовал from_avro для десериализации данных avro, но это не происходит в Java.
Если кто-нибудь знает решение, пожалуйста, помогите нам решить эту проблему.