Как создать DataFrame с пользовательской схемой в Spark - PullRequest
0 голосов
/ 13 января 2020

Я хотел создать в DataFrame с указанной схемой в Python. Вот процесс, который я проделал до сих пор.

  1. У меня есть файл Sample.parm, где я определил схему, как показано ниже: Account_type, string, True

  2. Я написал python скрипт sample.py, чтобы прочитать файл sample.parm, сгенерировать схему на основе файла sample.parm, а затем сгенерировать фрейм данных на основе определенной пользователем схемы.

d

def schema():
    with open('<path>/sample.parm','r') as parm_file:
        reader=csv.reader(parm_file,delimiter=",")
        filteredSchema = []
        for fieldName in reader:
            if fieldName[1].lower() == "decimal":
               filteredSchema.append([fieldName[0], DecimalType(),fieldName[2]])
            elif fieldName[1].lower() == "string":
               filteredSchema.append([fieldName[0], StringType(),fieldName[2]])
            elif fieldName[1].lower() == "integer":
               filteredSchema.append([fieldName[0], IntegerType(),fieldName[2]])
            elif fieldName[1].lower() == "date":
               filteredSchema.append([fieldName[0], DateType(),fieldName[2]])
            elif fieldName[1].lower() == "byte":
               filteredSchema.append([fieldName[0], ByteType(),fieldName[2]])
            elif fieldName[1].lower() == "boolean":
               filteredSchema.append([fieldName[0], BooleanType(),fieldName[2]])
            elif fieldName[1].lower() == "short":
               filteredSchema.append([fieldName[0], ShortType(),fieldName[2]])
            elif fieldName[1].lower() == "long":
               filteredSchema.append([fieldName[0], LongType(),fieldName[2]])
            elif fieldName[1].lower() == "double":
               filteredSchema.append([fieldName[0], DoubleType(),fieldName[2]])
            elif fieldName[1].lower() == "float":
               filteredSchema.append([fieldName[0], FloatType(),fieldName[2]])
            elif fieldName[1].lower() == "timestamp":
               filteredSchema.append([fieldName[0], TimestampType(),fieldName[2]])
 struct_schema = [StructField(line[0], line[1], line[2]) for line in filteredSchema]
 schema=StructTpe(struct_schema)
 return schema

def create_dataframe(path):
    val=spark.read.schema(schema()).csv(path, sep='\t')
    print(val.take(1))

но получаю ошибку вроде: pyspark.sql.utils.IllegalArgumentException: u'Failed to convert the JSON string \'{"metadata":{},"name":"account_type","nullable":"True","type":"string"}\' to a field.'

Не могли бы вы кому-нибудь помочь мне разобраться? ценим вашу помощь

1 Ответ

0 голосов
/ 14 января 2020

Я думаю, JSON сборка неправильная - метаданные пустые, "тип" и "поле" отсутствуют. Попробуйте следующую JSON для вашей схемы.

{"type":"struct","fields":[{"name":"account_type","type":"string","nullable":true,"metadata":{"name":"account_type","scale":0}}]}

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...