Как столбец строки в DataFrame разбивается на несколько столбцов при потоковой передаче Spark - PullRequest
2 голосов
/ 20 апреля 2020

Это текущий код:

from pyspark.sql import SparkSession

park_session = SparkSession\
    .builder\
    .appName("test")\
    .getOrCreate()

lines = spark_session\
    .readStream\
    .format("socket")\
    .option("host", "127.0.0.1")\
    .option("port", 9998)\
    .load()

The 'lines' looks like this:
+-------------+
|    value    |
+-------------+
|     a,b,c   |
+-------------+

But I want to look like this:
+---+---+---+
| a | b | c |
+---+---+---+

Я пытался использовать метод split (), но он не сработал. Вы можете только разбить каждую строку на список в столбце, а не на несколько столбцов

Что мне делать?

Ответы [ 3 ]

1 голос
/ 20 апреля 2020

Если у вас разное количество разделителей, а не только 3 для каждой строки, вы можете использовать следующее:

Ввод:

+-------+
|value  |
+-------+
|a,b,c  |
|d,e,f,g|
+-------+

Решение

import pyspark.sql.functions as F

max_size = df.select(F.max(F.length(F.regexp_replace('value','[^,]','')))).first()[0]
out = df.select([F.split("value",',')[x].alias(f"Col{x+1}") for x in range(max_size+1)])

Выход

out.show()

+----+----+----+----+
|Col1|Col2|Col3|Col4|
+----+----+----+----+
|   a|   b|   c|null|
|   d|   e|   f|   g|
+----+----+----+----+
0 голосов
/ 20 апреля 2020
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from pyspark.sql import SparkSession

spark_session = SparkSession\
    .builder\
    .appName("test")\
    .getOrCreate()

lines = spark_session\
    .readStream\
    .format("socket")\
    .option("host", "127.0.0.1")\
    .option("port", 9998)\
    .load()

split_col = f.split(lines['value'], ",")
df = df.withColumn('col1', split_col.getItem(0))
df = df.withColumn('col2', split_col.getItem(1))
df = df.withColumn('col2', split_col.getItem(2))

df.show()
0 голосов
/ 20 апреля 2020

Split столбец значений и путем доступа к array index (или) element_at(from spark-2.4) (или) getItem() функциям для создания новых столбцов.


from pyspark.sql.functions import *

lines.withColumn("tmp",split(col("value"),',')).\
withColumn("col1",col("tmp")[0]).\
withColumn("col2",col("tmp").getItem(1)).\
withColumn("col3",element_at(col("tmp"),3))
drop("tmp","value").\
show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|   a|   b|   c|
#+----+----+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...