Apache Kafka со структурным потоковым протоколом - PullRequest
0 голосов
/ 24 сентября 2019

Я пытаюсь написать потребителя Kafka (из protobuf), используя структурированную потоковую передачу.Давайте назовем protobuf A, который должен быть десериализован как байтовый массив (Array [Byte]) в Scala.

Я перепробовал все методы, которые могу найти в сети, но все еще не мог понять, как правильно проанализировать сообщение A

Метод 1: Из руководства по интеграции (https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html) я должен привести значение как String. Но даже если я получу getBytes для преобразования строки в байт для анализа моего сообщения A, я получу:

Exception in thread "main" java.lang.ExceptionInInitializerError
...
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8

Метод 2: Я хочу преобразовать значение непосредственно в байтовый массив. Я бы получил:

missing ')' at '['(line 1, pos 17)

== SQL ==
CAST(key AS Array[Byte])

Метод 3: Я следовал (https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html), чтобы написать свой собственный десериализаторof protobuf. Но получил сообщение об ошибке:

Schema for type A is not supported

Выше три метода - это, вероятно, все методы, которые я могу найти в Интернете. Это должен быть простой и распространенный вопрос, поэтому, если у кого-то есть понимание, пожалуйста, дайтея знаю.

Спасибо!

1 Ответ

0 голосов
/ 30 сентября 2019

Схема DataFrame, созданная из потокового источника:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

Таким образом, ключ и значение фактически находятся в Array[Byte].Вам придется выполнить десериализацию в операциях с Dataframe.

Например, у меня есть это для десериализации Kafka:

  import sparkSession.implicits._

  sparkSession.readStream
    .format("kafka")
    .option("subscribe", topic)
    .option(
      "kafka.bootstrap.servers",
      bootstrapServers
    )
    .load()
    .selectExpr("key", "value") // Selecting only key & value
    .as[(Array[Byte], Array[Byte])]
    .flatMap {
      case (key, value) =>
        for {
          deserializedKey <- Try {
            keyDeserializer.deserialize(topic, key)
          }.toOption
          deserializedValue <- Try {
            valueDeserializer.deserialize(topic, value)
          }.toOption
        } yield (deserializedKey, deserializedValue)
    }

Вам нужно изменить это, чтобы десериализовать ваши записи protobuf. * ​​1011 *

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...