Я использую spark 2.3.1 и kafka 0.10 с приведенными ниже зависимостями: kafka-avro-serializer-3.2.1.jar, kafka-schema-registry-client-3.2.1.jar, avro-1.8.1.jar, spark-avro_2.11-2.4.0.jar, spark-sql-kafka-0-10_2.11-2.4.0.jar, spark-streaming-kafka-0-10_2.11-2.4.0.jar
Работа над структурированной потоковой передачей в 2.3 с библиотеками 2.4 для использования from_avro, как показано ниже.
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.spark.sql.types._
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("schema.avsc")))
val rawTopicMessageDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).
option("subscribe", topic).
option("avroSchema",jsonFormatSchema).
option("startingOffsets", "latest").
option("maxOffsetsPerTrigger", 20).load()
val output = rawTopicMessageDF.select(from_avro('value, jsonFormatSchema) as 'user)
Я вижу ошибку ниже, когда выбираю значение из rawTopicMessageDF.
java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;
at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:51)
canкто-нибудь, пожалуйста, помогите решить эту проблему