Новое в Pyspark - импорт CSV и создание файла паркета со столбцами массива - PullRequest
0 голосов
/ 22 марта 2019

Я новичок в Pyspark, и я дергал себя за волосы, пытаясь достичь чего-то, что я считаю довольно простым.Я пытаюсь сделать процесс ETL, где CSV-файл преобразуется в файл паркета.Файл CSV имеет несколько простых столбцов, но один столбец представляет собой массив целых чисел с разделителями, который я хочу развернуть / распаковать в файл паркета.Этот файл паркета фактически используется микросервисом ядра .net, который использует Parquet Reader для выполнения вычислений в нисходящем направлении.Чтобы не усложнять этот вопрос, структура столбца:

"geomap" 5: 3: 7 | 4: 2: 1 | 8: 2: 78 -> это представляет собой массив из 3 элементов, эторазделен на "|"Затем выполняется сбор значений (5,3,7), (4,2,1), (8,2,78)

. Я пробовал различные процессы и схемы и не могуполучите это правильно.С помощью UDF я создаю либо список списков, либо список кортежей, но я не могу получить правильную схему или распаковать данные в операцию записи в паркет.Я либо получаю нули, ошибку или другие проблемы.Нужно ли подходить к этому по-другому?Соответствующий код ниже.Я просто показываю столбец проблемы для простоты, так как у меня все остальное работает.Это моя первая попытка Pyspark, поэтому извиняюсь за то, что упустил что-то очевидное:

def convert_geo(geo):
   return [tuple(x.split(':')) for x in geo.split('|')]

compression_type = 'snappy'

schema = ArrayType(StructType([
    StructField("c1", IntegerType(), False),
    StructField("c2", IntegerType(), False),
    StructField("c3", IntegerType(), False)
]))

spark_convert_geo = udf(lambda z: convert_geo(z),schema)

source_path = '...path to csv'
destination_path = 'path for generated parquet file'

df = spark.read.option('delimiter',',').option('header','true').csv(source_path).withColumn("geomap",spark_convert_geo(col('geomap')).alias("geomap"))
df.write.mode("overwrite").format('parquet').option('compression', compression_type).save(destination_path)

РЕДАКТИРОВАТЬ: По запросу, добавляя вывод printSchema (), я тоже не уверен, что здесь не так.Я до сих пор не могу получить значения разделения строки, чтобы показать или визуализировать должным образом.Это содержит все столбцы.Я вижу имена структур c1 и c2 и c3 ...

root |-- lrsegid: integer (nullable = true) |-- loadsourceid: integer (nullable = true) |-- agencyid: integer (nullable = true) |-- acres: float (nullable = true) |-- sourcemap: array (nullable = true) | |-- element: integer (containsNull = true) |-- geomap: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- c1: integer (nullable = false) | | |-- c2: integer (nullable = false) | | |-- c3: integer (nullable = false) 

1 Ответ

0 голосов
/ 22 марта 2019

Проблема в том, что функция convert_geo возвращает список кортежей с символьными элементами, а не с целыми числами, как указано в схеме.Если вы измените его следующим образом, он будет работать:

def convert_geo(geo):
    return [tuple([int(y) for y in x.split(':')]) for x in geo.split('|')]
...