Spark Stream - кодек utf8 не может декодировать байты - PullRequest
0 голосов
/ 08 октября 2018

Я довольно новичок в потоковом программировании.У нас есть Kafka Stream, который использует Avro.

Я хочу подключить Kafka Stream к Spark Stream.Я использовал приведенный ниже код.

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1]) 

Я получил приведенную ниже ошибку.

return Файл s.decode ('utf-8') "/usr/lib64/python2.7/encodings / utf_8.py ", строка 16, в декодировании возвращает codecs.utf_8_decode (input, errors, True) UnicodeDecodeError: кодек utf8 не может декодировать байты в позиции 57-58: недопустимый байт продолжения

Нужно ли указывать, что Кафка использует Avro. Выше ошибки для этого?Если это так, как я могу это указать?

Ответы [ 2 ]

0 голосов
/ 08 октября 2018

Да, вы должны указать его.

С java:

создание потока:

final JavaInputDStream<ConsumerRecord<String, avroType>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.Subscribe(topics, kafkaParams));

в конфигурации потребителя kafka:

kafkaParams.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class);
        kafkaParams.put("value.deserializer", SpecificAvroDeserializer.class);
0 голосов
/ 08 октября 2018

Правильно, проблема в десериализации потока.Вы можете использовать библиотеку confluent-kafka-python и указать valueDecoder in:

from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient`
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}, valueDecoder=MessageSerializer.decode_message)`

Кредиты для решения до https://stackoverflow.com/a/49179186/6336337

...