Как правильно использовать тему Kafka с структурированной потоковой передачей Java Spark - PullRequest
0 голосов
/ 04 июля 2019

Я новичок в потоковой передаче kafka-spark и пытаюсь реализовать примеры из документации spark с использованием протокола сериализатора / десериализации буфера. До сих пор я следовал официальным учебникам на

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html https://developers.google.com/protocol-buffers/docs/javatutorial

и теперь я застрял со следующей проблемой. Этот вопрос может быть похож на этот пост Как десериализовать записи из Kafka с использованием структурированного потокового вещания в Java?

Я уже успешно реализовал сериализатор, который пишет сообщения на тему кафки. Теперь задача состоит в том, чтобы потреблять его с помощью искровой структурированной потоковой передачи с помощью специального десериализатора.

public class CustomDeserializer implements Deserializer<Person> {

@Override
public Person deserialize(String topic, byte[] data) {
    Person person = null;
    try {
        person = Person.parseFrom(data);

        return person;
    } catch (Exception e) {
               //ToDo
    }

    return null;
 }


Dataset<Row> dataset = sparkSession.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", topic)
        .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        .option("value.deserializer", "de.myproject.CustomDeserializer")
        .load()
        .select("value");

    dataset.writeStream()
        .format("console")
        .start()
        .awaitTermination();

Но в качестве вывода я все еще получаю двоичные файлы.

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|[08 AC BD BB 09 1...|
+--------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|[08 82 EF D8 08 1...|
+--------------------+

Что касается учебника, мне просто нужно установить опцию для value.deserializer, чтобы иметь читаемый человеком формат

.option("value.deserializer", "de.myproject.CustomDeserializer")

Я что-то пропустил?

Ответы [ 2 ]

0 голосов
/ 05 июля 2019

Вам необходимо преобразовать байт в String datatype. dataset.selectExpr («CAST (ключ как STRING)», «CAST (значение как STRING)»)

Затем вы можете использовать функции. from_json (dataset.col ("value"), StructType), чтобы вернуть фактический DF.

Happy Coding:)

0 голосов
/ 05 июля 2019

Вы пропустили этот раздел документации?

Обратите внимание, что следующие параметры Кафки не могут быть установлены, и источник или приемник Кафки будут выдавать исключение:

  • key.deserializer : ключи всегда десериализуются как байтовые массивы с помощью ByteArrayDeserializer. Используйте операции DataFrame для явной десериализации ключей.
  • value.deserializer : Значения всегда десериализуются как байтовые массивы с помощью ByteArrayDeserializer. Используйте операции DataFrame для явной десериализации значений.

Вам придется зарегистрировать UDF, который вместо этого вызывает ваши десериализаторы

Аналогично Считывание сообщения protobuf kafka с использованием потоковой структурированной искры

...