Spark: потребитель Kafka получает данные в виде строк в кодировке base64, хотя Producer не выполняет явное кодирование - PullRequest
0 голосов
/ 11 января 2019

Я пробую простой пример публикации данных в Kafka и использования их с помощью Spark.

Вот код производителя:

var kafka_input = spark.sql("""
SELECT CAST(Id AS STRING) as key, 
       to_json(
               named_struct(
                             'Id', Id,                             
                             'Title',Title                           
                           )
              ) as value 
FROM offer_data""")

kafka_input.write
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaBrokers)
          .option("topic", topicName)
          .save()

Я проверил, что kafka_input имеет строку json для значения и число, приведенное в качестве строки для ключа.

Вот код потребителя:

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", topicName)
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

df.take(50)
display(df)

Данные, которые я получаю на стороне потребителя, представляют собой строку в кодировке base64.

enter image description here

Как мне декодировать значение в Scala? Кроме того, этот оператор чтения не сбрасывает эти записи из очереди Kafka. Я предполагаю, что это потому, что я не посылаю никаких сигналов подтверждения обратно Кафке. Это верно? Если да, то как мне это сделать?

Ответы [ 2 ]

0 голосов
/ 12 января 2019

Проблема была в том, что я использовал SelectExpr. Он не выполняет преобразование на месте. Он возвращает преобразованные данные. Исправлено:

df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

display(df1)
0 голосов
/ 11 января 2019

попробуйте это ..

df.foreach(row => {
  val key = row.getAs[Array[Byte]]("key")
  val value = row.getAs[Array[Byte]]("value")
  println(scala.io.Source.fromBytes(key,"UTF-8").mkString)
  println(scala.io.Source.fromBytes(value,"UTF-8").mkString)
})
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...