Превратите Spark Stream из Socket в DataFrame - PullRequest
0 голосов
/ 20 марта 2019

Я установил сокет-соединение с моим SparkSession, который отправляет строку .csv-файла в мой поток.

Пока мой (PySpark-) код выглядит так:

stream = spark.readStream.format('socket').option('host', 'localhost').option('port', 5555).load()

stream.writeStream.format('console').start().awaitTermination()

Это печатает строки файла .csv в одном столбце следующим образом:

+-----------------+
|            value|
+-----------------+
|[2, C4653, C5030]|
+-----------------+

Но то, что я действительно хотел бы иметь, это:

+-----+-----+-----+
| col1| col2| col3|
+-----+-----+-----+
|    2|C4653|C5030|
+-----+-----+-----+

Я хотел бы использовать это как DataFrame для подачи ML-конвейера.

Как я могу обработать входящие данные потока?

1 Ответ

1 голос
/ 20 марта 2019

У вас уже есть фрейм данных stream , который просто нужно изменить схему.

Просто добавьте это преобразование после load () call:

 stream.selectExpr("split(value, ' ')[0] as col1","split(value, ' ')[1] as col2", "split(value, ' ')[2] as col3")
...