Confluent Kafka: confluent_kafka.avro.serializer.SerializerError: невозможно получить схему - PullRequest
0 голосов
/ 24 августа 2018

Когда мы попытались прочитать kafka формата avro через тему kafka, при использовании confluent_kafka.avro.serializer не удалось прочитать схему тема

Фрагмент кода:

 schema_registry_client = CachedSchemaRegistryClient(url='http://ashaplq00003:8081') 
serializer
   = MessageSerializer(schema_registry_client) 
spark = SparkSession.builder \   .appName('SparkCassandraApp') \  
   .config('spark.cassandra.connection.host', 'localhost') \  
   .config('spark.cassandra.connection.port', '9042') \  
   .config('spark.cassandra.output.consistency.level','ONE') \  
   .master('local[2]') \   .getOrCreate() 
sc = spark.sparkContext ssc =
   StreamingContext(sc, 10) 
kvs = KafkaUtils.createDirectStream(ssc,
   ['NBC_APPS.TBL_MS_ADVERTISER'], {"metadata.broker.list":
   'ashaplq00003:9192'},valueDecoder=serializer.decode_message)
   kvs.foreachRDD(handler) 
ssc.start() 
ssc.awaitTermination()

Сообщение об ошибке: Файл

"/Users/KarthikeyanDurairaj/Desktop/Sparkshell/python/lib/pyspark.zip/pyspark/worker.py",
   line 230, in main
       process()   File "/Users/KarthikeyanDurairaj/Desktop/Sparkshell/python/lib/pyspark.zip/pyspark/worker.py",
   line 225, in process
       serializer.dump_stream(func(split_index, iterator), outfile)   File
   "/Users/KarthikeyanDurairaj/Desktop/Sparkshell/python/lib/pyspark.zip/pyspark/serializers.py",
   line 372, in dump_stream
       vs = list(itertools.islice(iterator, batch))   File "/Users/KarthikeyanDurairaj/Desktop/Sparkshell/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
   line 131, in funcWithoutMessageHandler   File
   "/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/serializer/message_serializer.py",
   line 215, in decode_message
       decoder_func = self._get_decoder_func(schema_id, payload)   File "/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/serializer/message_serializer.py",
   line 165, in _get_decoder_func
       raise SerializerError("unable to fetch schema with id %d" % (schema_id)) confluent_kafka.avro.serializer.SerializerError: unable
   to fetch schema with id 77
...