Чтение CSV-файла в PySpark Streaming с другой схемой в одном и том же файле - PullRequest
0 голосов
/ 28 мая 2018

У меня есть CSV-файл с разными длинами в строке, например:

left, 10, xdfe, 8992, 0.231
left, 10, xdfk, 8993, 2.231
right, 20, adfk, 8993, 2.231, DDT, 10, 10
right, 30, dfk, 923, 2.231, ADD, 10, 20
center, 923, 2.231, 10, 20
right, 34, efk, 326, 6.21, DDD, 20, 40

, где строки, начинающиеся с ключевого слова, left, right и center имеют одинаковую длину(например, строки left всегда имеют ту же длину, что и другие строки left).

Я хочу прочитать эти файлы, используя spark.readStream.csv, выполнить некоторые преобразования, которые могут зависеть от типа строки, и записать результаты в паркет.Есть ли способ использовать разные схемы, основанные на значении первого столбца каждой строки?

1 Ответ

0 голосов
/ 29 мая 2018

Нет, вы не можете использовать несколько схем для одного файла.Лучшее, что вы можете сделать, это использовать схему для самой длинной строки и установить mode в PERMISSIVE, это даст нулевые значения в пропущенных столбцах для более коротких строк.

К сожалению, это означает, что имена типов и столбцов будут разными, если отсутствующие столбцы находятся не в конце строки.Например, третий столбец представляет собой строку для строк right, а может быть плавающей для строк center (похоже, это должен быть пятый столбец).Одним из способов было бы прочитать все как строки и затем выполнить преобразование, но в зависимости от данных некоторые столбцы могут быть прочитаны, например, как float.

schema = StructType().add("a", "string").add("b", "string") \
    .add("c", "string").add("d", "string").add("e", "string") \
    .add("f", "string").add("g", "string").add("h", "string")

df = spark \
    .readStream \
    .option("mode", "PERMISSIVE") \
    .schema(schema) \
    .csv("/path/to/directory")

После того, как это будет сделано, можно сделать некоторые преобразованияданные, чтобы получить правильно выглядящий кадр данных.Ниже приведен код на Scala, но он должен быть легко преобразован в python и адаптирован к фактическим потребностям:

val df2 = df.select($"a", 
    when($"a" === "center", null).otherwise($"b").cast(FloatType).as("b"),
    when($"a" === "center", null).otherwise($"c").as("c"),
    when($"a" === "center", $"b").otherwise($"d").cast(FloatType).as("d"),
    when($"a" === "center", $"c").otherwise($"e").cast(FloatType).as("e"),
    $"f", $"g", $"h")

Окончательный результат:

+------+----+-----+------+-----+----+----+----+
|     a|   b|    c|     d|    e|   f|   g|   h|
+------+----+-----+------+-----+----+----+----+
|  left|10.0| xdfe|8992.0|0.231|null|null|null|
|  left|10.0| xdfk|8993.0|2.231|null|null|null|
| right|20.0| adfk|8993.0|2.231| DDT|  10|  10|
| right|30.0|  dfk| 923.0|2.231| ADD|  10|  20|
|center|null| null| 923.0|2.231|null|null|null|
| right|34.0|  efk| 326.0| 6.21| DDD|  20|  40|
+------+----+-----+------+-----+----+----+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...