Azure Блоки данных, невозможно прочитать разделенный запятыми файл CSV с вложенными списками - PullRequest
0 голосов
/ 28 апреля 2020

У меня есть один довольно большой файл CSV с разделителями-запятыми (12 ГБ). У меня есть 4 столбца, 1 из них содержит вложенные списки с jsons. Я могу создать соединение из Excel, и оно правильно его читает (несмотря на то, что у меня там есть несколько вложенных списков, что означает больше запятых). Тем не менее, когда я пытаюсь сделать это с помощью spark, при каждом появлении запятой он разбивается на части, что создает много беспорядка.

Хорошо, поэтому я попытался предоставить схему. Очевидно, CSV не поддерживает тип массива, поэтому я не могу сделать это так легко. Я могу определить схему со строкой вместо массива, но тогда мой последний столбец будет выглядеть так:

enter image description here

Итак, я получаю все, что предшествует первой запятой, отдых пропал.

Я пытался прочитать его как RDD с s c .textFile, но затем структуры json сломались.

Я пытаюсь написать функцию, которая исправит мои jsons, но это довольно разочаровывает, поэтому я подумал, что, может быть, есть какой-то более простой способ?

Я знаю, что это может быть глупый вопрос, но я просто не понимаю, почему что-то такое простое в Excel быть таким сложным в искре?

Спасибо за любые советы!

РЕДАКТИРОВАТЬ Excel: enter image description here

Искра: enter image description here

То же самое со схемой, определенной следующим образом:

user_schema = StructType([
    StructField("businessID", StringType(), True), 
    StructField("vid", StringType(), True),
    StructField("company_name", StringType(), True),
    StructField("financial_statements", StringType(), True)
])

Я также попытался изменить файл на паркет, используя ADF, чтобы поймать схему, но она тоже не работает. Хотя я предоставил один из образца. АПД: enter image description here

1 Ответ

0 голосов
/ 01 мая 2020

Попробуйте [ как option

val schema = StructType(
  Seq(
    StructField("businessID", StringType),
    StructField("vid", StringType),
    StructField("company_name", StringType),
    StructField("financial_statements", StringType),
  ),
)

val df = spark.read
  .schema(schema)
  .format("csv")
  .option("quote", "[")
  .load(path)

val jsonSchema = StructType(
  Seq(
    StructField("key1", StringType),
    StructField("key2", IntegerType),
    StructField("key3", BooleanType),
  ),
)

val dfClean = df.withColumn("financial_statements_json", from_json($"financial_statements", jsonSchema))

dfClean.printSchema()
dfClean.show(false)

Вывод:

+----------+---+------------+-------------------------------------------+-------------------------+
|businessID|vid|company_name|financial_statements                       |financial_statements_json|
+----------+---+------------+-------------------------------------------+-------------------------+
|1         |A  |AAA         |{"key1": "va1", "key2": 123, "key3": true}]|[va1, 123, true]         |
+----------+---+------------+-------------------------------------------+-------------------------+

...