Как правильно выбрать тип данных в паркете с помощью Spark от CSV с pyspark - PullRequest
0 голосов
/ 14 ноября 2018

У меня есть CSV-файл, который выглядит примерно так:

39813458,13451345,14513,SomeText,344564,Some other text,328984,"[{""field_int_one"":""16784832510"",""second_int_field"":""84017"",""third_int_field"":""245"",""some_timestamp_one"":""2018-04-17T23:54:34.000Z"",""some_other_timestamp"":""2018-03-03T15:34:04.000Z"",""one_more_int_field"":0,},{""field_int_one"":""18447548326"",""second_int_field"":""04965"",""third_int_field"":""679"",""some_timestamp_one"":""2018-02-06T03:39:12.000Z"",""some_other_timestamp"":""2018-03-01T09:19:12.000Z"",""one_more_int_field"":0}]"

Я превращаю его в паркет с

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

if __name__ == "__main__":
    sqlContext = SQLContext(sc)

    schema = StructType([
              StructField("first_int", IntegerType(), True),
              StructField("second_int", IntegerType(), True),
              StructField("third_int", IntegerType(), True),
              StructField("first_string_field", StringType(), True),
              StructField("fourth_int", IntegerType(), True),
              StructField("second_string_field", StringType(), True),
              StructField("last_int_field", StringType(), True),
              StructField("json_field", StringType(), True)])

    rdd = spark.read.schema(schema).csv("source_file.csv")
    rdd.write.parquet('parquet_output')

Он работает и преобразует его, но если вы делаете .printSchema, как только вы запрашиваете его, он, очевидно, печатает его определение как String. Как я могу правильно объявить это последнее поле как Json?

1 Ответ

0 голосов
/ 14 ноября 2018

Я думаю, что вложенная ArrayType будет работать для этого типа схемы

schema = StructType([
          StructField("first_int", IntegerType(), True),
          StructField("second_int", IntegerType(), True),
          StructField("third_int", IntegerType(), True),
          StructField("first_string_field", StringType(), True),
          StructField("fourth_int", IntegerType(), True),
          StructField("second_string_field", StringType(), True),
          StructField("last_int_field", StringType(), True),
          StructField("json_field", ArrayType(
                StructType() \
                   .add("field_int_one", IntegerType()) \
                   .add("field_string_one", StringType()) \
                   .addMoreFieldsHere), 
          True)])
...