Я новичок в потоковой передаче kafka-spark и пытаюсь реализовать примеры из документации spark с использованием протокола сериализатора / десериализации буфера. До сих пор я следовал официальным учебникам на
https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html
https://developers.google.com/protocol-buffers/docs/javatutorial
и теперь я застрял со следующей проблемой. Этот вопрос может быть похож на этот пост Как десериализовать записи из Kafka с использованием структурированного потокового вещания в Java?
Я уже успешно реализовал сериализатор, который пишет сообщения на тему кафки. Теперь задача состоит в том, чтобы потреблять его с помощью искровой структурированной потоковой передачи с помощью специального десериализатора.
public class CustomDeserializer implements Deserializer<Person> {
@Override
public Person deserialize(String topic, byte[] data) {
Person person = null;
try {
person = Person.parseFrom(data);
return person;
} catch (Exception e) {
//ToDo
}
return null;
}
Dataset<Row> dataset = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topic)
.option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.option("value.deserializer", "de.myproject.CustomDeserializer")
.load()
.select("value");
dataset.writeStream()
.format("console")
.start()
.awaitTermination();
Но в качестве вывода я все еще получаю двоичные файлы.
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+
| value|
+--------------------+
|[08 AC BD BB 09 1...|
+--------------------+
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+
| value|
+--------------------+
|[08 82 EF D8 08 1...|
+--------------------+
Что касается учебника, мне просто нужно установить опцию для value.deserializer, чтобы иметь читаемый человеком формат
.option("value.deserializer", "de.myproject.CustomDeserializer")
Я что-то пропустил?