Разделение сообщений Кафки построчно в Spark структурированной потоковой передаче - PullRequest
0 голосов
/ 08 февраля 2019

Я хочу прочитать сообщение из темы Кафки в моей работе Spark Structured Streaming во фрейм данных.но я получаю все сообщение в одном смещении, поэтому в кадре данных только это сообщение входит в одну строку вместо нескольких строк.(в моем случае это 3 строки)

Когда я печатаю это сообщение, я получаю вывод ниже:

enter image description here

Сообщение "Text1 "," Text2 "и" Text3 "Я хочу в 3 строки во фрейме данных, чтобы я мог обрабатывать дальше.

Пожалуйста, помогите мне.

1 Ответ

0 голосов
/ 22 февраля 2019

Вы можете использовать пользовательскую функцию (UDF) для преобразования строки сообщения в последовательность строк, а затем применить функцию explode к этому столбцу, чтобы создать новуюстрока для каждого элемента в последовательности:

Как показано ниже (в Scala тот же принцип применяется к pyspark):

case class KafkaMessage(offset: Long, message: String)

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.explode

val df = sc.parallelize(List(KafkaMessage(1000, "Text1\nText2\nText3"))).toDF()

val splitString = udf { s: String => s.split('\n') }

df.withColumn("splitMsg", explode(splitString($"message")))
  .select("offset", "splitMsg")
  .show()

это приведет к следующему выводу:

+------+--------+
|offset|splitMsg|
+------+--------+
|  1000|   Text1|
|  1000|   Text2|
|  1000|   Text3|
+------+--------+
...