Spark Структурированный поток с десериализацией данных Avro из topi c In Java - PullRequest
0 голосов
/ 31 марта 2020

Я использую spark со структурированным потоком для чтения данных схемы avro из kafka topi c in JAVA. Но при десериализации данных avro возникает проблема.
Я использую следующие артефакты:

spark-core_2.12: 2.4.5 spark-sql_2.12: 2.4.5 spark- sql -kafka-0-10_2.12: 2.4.5 kafka-avro-serializer: 5.3.0 spark-streaming_2.12 : 2.4.5 org. apache .spark: spark-avro: 2.4.5

Ниже приведен код для чтения avro (десериализация):

...

Dataset<Row> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("key.deserializer", KafkaAvroDeserializer.class.getName())
            .option("value.deserializer", KafkaAvroDeserializer.class.getName())
            .option("schema.registry.url", "http://127.0.0.1:8081")
            .option("specific.avro.reader", "true")
            .option("subscribe", "epeTopicdt")
            .load()
            .selectExpr("CAST(value AS STRING) as JSON");

...

, даже если я использовал from_avro для десериализации данных avro, но это не происходит в Java.

Если кто-нибудь знает решение, пожалуйста, помогите нам решить эту проблему.

...