Как добавить заголовки в PySpark DataFrame? - PullRequest
0 голосов
/ 11 мая 2019

Я создал PyDpark RDD (преобразованный из XML в CSV), у которого нет заголовков.Мне нужно преобразовать его в DataFrame с заголовками, чтобы выполнить некоторые запросы SparkSQL.Я не могу найти простой способ добавить заголовки.Большинство примеров начинаются с набора данных, у которого уже есть заголовки.

    df = spark.read.csv('some.csv', header=True, schema=schema)

Однако мне нужно добавить заголовки.

    headers = ['a', 'b', 'c', 'd']

Это кажется тривиальной проблемой, я не уверен, почемуЯ не могу найти рабочее решение.Спасибо.

1 Ответ

0 голосов
/ 11 мая 2019

Как это ... вам нужно указать схему и .option("header", "false"), если ваш CSV не содержит строку заголовка

spark.version
'2.3.2'

! cat sample.csv

1, 2.0,"hello"
3, 4.0, "there"
5, 6.0, "how are you?"

PATH = "sample.csv"

from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([\
    StructField("col1", IntegerType(), True),\
    StructField("col2", FloatType(), True),\
    StructField("col3", StringType(), True)])

csvFile = spark.read.format("csv")\
.option("header", "false")\
.schema(schema)\
.load(PATH)

csvFile.show()

+----+----+---------------+
|col1|col2|           col3|
+----+----+---------------+
|   1| 2.0|          hello|
|   3| 4.0|        "there"|
|   5| 6.0| "how are you?"|
+----+----+---------------+

# if you have rdd and want to convert straight to df
rdd = sc.textFile(PATH)

# just showing rows
for i in rdd.collect(): print(i)
1, 2.0,"hello"
3, 4.0, "there"
5, 6.0, "how are you?"

# use Row to construct a schema from rdd
from pyspark.sql import Row

csvDF = rdd\
    .map(lambda x: Row(col1 = int(x.split(",")[0]),\
                       col2 = float(x.split(",")[1]),\
                       col3 = str(x.split(",")[2]))).toDF()

csvDF.show()
+----+----+---------------+
|col1|col2|           col3|
+----+----+---------------+
|   1| 2.0|        "hello"|
|   3| 4.0|        "there"|
|   5| 6.0| "how are you?"|
+----+----+---------------+

csvDF.printSchema()
root
 |-- col1: long (nullable = true)
 |-- col2: double (nullable = true)
 |-- col3: string (nullable = true)

...