Я пытаюсь преобразовать значение ключа RDD в DataFrame Pyspark с помощью метода sqlContext.createDataFrame (). Если я не объявляю схему явно, я получаю сообщение об ошибке, потому что некоторые столбцы из RDD имеют нулевые значения.
Вот что я запускаю в оболочке Pyspark. Все работает как положено, пока я не попытаюсь взять Pandas фрейм данных, который у меня есть в Pyspark, и попытаться преобразовать его в Spark Dataframe.
import parser
import pandas as pd
import flat_table
import itertools
import importlib
import pathlib
import re
import os
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.functions import array, col, explode, struct, lit
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
subdirectory = "/path/to/directory"
fs = FileSystem.get(URI("hdfs://localfilesystem:port"), Configuration())
status = fs.globStatus(Path(subdirectory))
def fuse_sections(document_data: dict) -> DataFrame:
all_section_data = pd.concat(document_data)
return all_section_data
declared_schema = StructType([StructField('name', BinaryType(), True),
StructField('date', BinaryType(), True)])
for fileStatus in status:
pathname = str(fileStatus.getPath())
xml_rdd = sc.wholeTextFiles(pathname, use_unicode=False)
tree_rdd = xml_rdd.mapValues(parser)
if tree_rdd.isEmpty() == True:
pass
else:
parsed_rdd = data_dict_rdd.mapValues(fuse_sections)
values_rdd = parsed_rdd.values()
#The following print statement shows that the values_rdd is compiling as expected
print(values_rdd.collect())
spark_df = sqlContext.createDataFrame(values_rdd, declared_schema)
#The following dataframe shows all nulls
spark_df.show(200)
Является ли тот факт, что я пытаюсь go из Pandas Dataframe в RDD в Spark Dataframe, где он портится и приводит ко всем нулям?