Мне показалось, что читать данные формата kafka AVRO непросто.Я разработал код в потоковой передаче искры с использованием твиттера, но я получаю ошибку обратного байта при любом предложении.
Error : Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): com.twitter.bijection.InversionFailure: Failed to invert: [B@5335860
Новый код, который я использовал:
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.{KafkaAvroDecoder, KafkaAvroDeserializer}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.avro.Schema
import org.apache.avro.hadoop.io.AvroDeserializer
import org.apache.commons.codec.StringDecoder
object ReadKafkaAvro1 {
object Injection {
val schemaRegistryURL = "http://vtorppsdv01.corp.moneris.com:8081"
val topics = "b24_tx_financial_formatted_clean"
val subjectValueName = topics + "-value"
val restService = new RestService(schemaRegistryURL)
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
val parser = new Schema.Parser()
// val schema = parser.parse(getClass.getResourceAsStream("src\\main\\resources\\b24_tx_financial_formatted_clean.avsc"))
val schema = parser.parse((valueRestResponseSchema.getSchema))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ReadKafkaAvro").setMaster("local[*]")
val streamingCtx = new StreamingContext(conf,Seconds(30))
val schemaRegistryURL1 = "http://vtorppsdv01.corp.moneris.com:8081"
val topics = Array("b24_tx_financial_formatted_clean")
streamingCtx.sparkContext.setLogLevel("ERROR")
val kafkaParms = Map[String,Object]("bootstrap.servers" -> "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"group.id" -> "b24_ptlf_eim_processing" ,
"auto.offset.reset" -> "earliest",
"auto.commit.interval.ms" -> "2000",
"schema.registry.url" -> schemaRegistryURL1,
"enable.auto.commit" -> (false: java.lang.Boolean),
"security.protocol" -> "SSL",
"ssl.keystore.location" -> "C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\kafka-eim-dev.jks",
"ssl.keystore.password" -> "BW^1=|sY$j",
"ssl.key.password" -> "BW^1=|sY$j",
"ssl.truststore.location" -> "C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\cpbp-ca-dev.jks",
"ssl.truststore.password" -> "iB>3v$6m@9",
"ssl.keystore.type" -> "JCEKS",
"ssl.truststore.type" -> "JCEKS",
"specific.avro.reader" -> "True"
)
val inputStream = KafkaUtils.createDirectStream[String,Array[Byte]](streamingCtx,PreferConsistent,Subscribe[String,Array[Byte]](topics,kafkaParms))
val recordStream = inputStream.map(msg => Injection.injection.invert(msg.value()).get)
// .map(record => (record.get("AuthorizationTransactionSource"),record.get("AuthorizationTransactionSourceID")))
inputStream.map(x => (x.key,x.value)).print()
//recordStream.print()
recordStream.print()
streamingCtx.start()
streamingCtx.awaitTermination()
}
}