Чтение данных с использованием настроенной схемы - PullRequest
1 голос
/ 30 января 2020

У меня есть файл с разделителями трубы без заголовков, и я хочу прочитать его в Spark.

Данные выглядят следующим образом:

100005072756|R|OTHER|8|125000|360|11/2000|01/2001|95|95|1|29|694|N|P|SF|1|P|MI|492|30|FRM||1|N
100006905529|C|FLAGSTAR BANK, FSB|7.875|99000|360|12/2000|02/2001|80|80|1|55|689|N|C|SF|1|P|CA|923||FRM|||N
100010275736|R|JPMORGAN CHASE BANK, NA|7.625|140000|360|12/2000|02/2001|21|21|1|12|796|N|R|SF|1|P|CO|803||FRM|||N
100015654434|B|FLAGSTAR BANK, FSB|7.625|102000|360|03/2001|05/2001|80|80|2|32|706|N|C|SF|1|P|WI|541||FRM|661||N
100020827204|R|OTHER|7.125|214000|360|01/2001|03/2001|80|80|1|43|731|N|P|SF|4|P|IL|604||FRM|||N
100027347659|R|OTHER|8|195000|360|12/2000|02/2001|74|74|2|42|720|N|C|SF|1|P|VA|236||FRM|696||N
100037532932|R|BANK OF AMERICA, N.A.|7.75|228000|360|12/2000|02/2001|95|95|2|38|642|N|P|SF|1|P|AL|352|30|FRM|664|2|N
100040777887|C|CITIMORTGAGE, INC.|7|145000|360|01/2001|03/2001|71|71|2|20|752|N|P|SF|1|P|CA|953||FRM|786||N
100041189347|R|FLEET NATIONAL BANK|7.875|150000|240|10/2000|12/2000|85|85|2|34|734|N|C|SF|1|P|CO|810|6|FRM||1|N
100043437231|R|JPMORGAN CHASE BANK, NA|6.875|275000|240|12/2000|02/2001|71|71|1|41|765|N|P|PU|1|P|VA|201||FRM|||N

Это код, который я использую:

origSchema = StructType([
                         StructField("loan_id", StringType(), True),
                         StructField("origination_channel", StringType(), True),
                         StructField("seller_name", StringType(), True),
                         StructField("original_interest_rate", DoubleType(), True),
                         StructField("original_upb", DoubleType(), True),
                         StructField("original_loan_term", IntegerType(), True),
                         StructField("origination_date", DateType(), True),
                         StructField("first_payment_date", DateType(), True),
                         StructField("original_ltv", IntegerType(), True),
                         StructField("original_cltv", IntegerType(), True),
                         StructField("number_of_borrowers", IntegerType(), True),
                         StructField("original_dti", IntegerType(), True),
                         StructField("borrower_fico_at_origination", IntegerType(), True),
                         StructField("first_time_home_buyer_indicator", StringType(), True),
                         StructField("loan_purpose", StringType(), True),
                         StructField("property_type", StringType(), True),
                         StructField("number_of_units", StringType(), True),
                         StructField("occupancy_type", StringType(), True),
                         StructField("property_state", StringType(), True),
                         StructField("zip_code_short", StringType(), True),
                         StructField("primary_mortgage_insurance_percent", IntegerType(), True),
                         StructField("product_type", StringType(), True),
                         StructField("coborrower_fico_at_origination", IntegerType(), True),
                         StructField("mortgage_insurance_type", StringType(), True),
                         StructField("relocation_mortgage_indicator", StringType(), True)
])

df = spark.read.load("/mnt/mi-sa-armor/INPUT/FANNIEMAE/2001Q1/Acquisition_2001Q1.txt", format="csv", sep="|", header="false", schema=origSchema)

Проверяя фрейм данных:

+-------+-------------------+-----------+----------------------+------------+------------------+----------------+------------------+------------+-------------+-------------------+------------+----------------------------+-------------------------------+------------+-------------+---------------+--------------+--------------+--------------+----------------------------------+------------+------------------------------+-----------------------+-----------------------------+
|loan_id|origination_channel|seller_name|original_interest_rate|original_upb|original_loan_term|origination_date|first_payment_date|original_ltv|original_cltv|number_of_borrowers|original_dti|borrower_fico_at_origination|first_time_home_buyer_indicator|loan_purpose|property_type|number_of_units|occupancy_type|property_state|zip_code_short|primary_mortgage_insurance_percent|product_type|coborrower_fico_at_origination|mortgage_insurance_type|relocation_mortgage_indicator|
+-------+-------------------+-----------+----------------------+------------+------------------+----------------+------------------+------------+-------------+-------------------+------------+----------------------------+-------------------------------+------------+-------------+---------------+--------------+--------------+--------------+----------------------------------+------------+------------------------------+-----------------------+-----------------------------+
|   null|               null|       null|                  null|        null|              null|            null|              null|        null|         null|               null|        null|                        null|                           null|        null|         null|           null|          null|          null|          null|                              null|        null|                          null|                   null|                         null|
|   null|               null|       null|                  null|        null|              null|            null|              null|        null|         null|               null|        null|                        null|                           null|        null|         null|           null|          null|          null|          null|                              null|        null|                          null|                   null|                         null|
|   null|               null|       null|                  null|        null|              null|            null|              null|        null|         null|               null|        null|                        null|                           null|        null|         null|           null|          null|          null|          null|                              null|        null|                          null|                   null|                         null|
|   null|               null|       null|                  null|        null|              null|            null|              null|        null|         null|               null|        null|                        null|                           null|        null|         null|           null|          null|          null|          null|                              null|        null|                          null|                   null|                         null|
|   null|               null|       null|                  null|        null|              null|            null|              null|        null|         null|               null|        null|                        null|                           null|        null|         null|           null|          null|          null|          null|                              null|        null|                          null|                   null|                         null|
+-------+-------------------+-----------+----------------------+------------+------------------+----------------+------------------+------------+-------------+-------------------+------------+----------------------------+-------------------------------+------------+-------------+---------------+--------------+--------------+--------------+----------------------------------+------------+------------------------------+-----------------------+-----------------------------+
only showing top 5 rows

Не похоже, что я прочитал их правильно.

Но если я удалил опцию схемы, все будет хорошо, но так как у данных не было заголовков, фрейм данных выглядел не так, как ожидалось.

df = spark.read.load("/mnt/mi-sa-armor/INPUT/FANNIEMAE/2001Q1/Acquisition_2001Q1.txt", format="csv", sep="|", header="false")
+------------+---+--------------------+-----+------+---+-------+-------+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|         _c0|_c1|                 _c2|  _c3|   _c4|_c5|    _c6|    _c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|
+------------+---+--------------------+-----+------+---+-------+-------+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|100005072756|  R|               OTHER|    8|125000|360|11/2000|01/2001| 95| 95|   1|  29| 694|   N|   P|  SF|   1|   P|  MI| 492|  30| FRM|null|   1|   N|
|100006905529|  C|  FLAGSTAR BANK, FSB|7.875| 99000|360|12/2000|02/2001| 80| 80|   1|  55| 689|   N|   C|  SF|   1|   P|  CA| 923|null| FRM|null|null|   N|
|100010275736|  R|JPMORGAN CHASE BA...|7.625|140000|360|12/2000|02/2001| 21| 21|   1|  12| 796|   N|   R|  SF|   1|   P|  CO| 803|null| FRM|null|null|   N|
|100015654434|  B|  FLAGSTAR BANK, FSB|7.625|102000|360|03/2001|05/2001| 80| 80|   2|  32| 706|   N|   C|  SF|   1|   P|  WI| 541|null| FRM| 661|null|   N|
|100020827204|  R|               OTHER|7.125|214000|360|01/2001|03/2001| 80| 80|   1|  43| 731|   N|   P|  SF|   4|   P|  IL| 604|null| FRM|null|null|   N|
+------------+---+--------------------+-----+------+---+-------+-------+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
only showing top 5 rows

Не могли бы вы помочь мне с трудом снять мой код, почему он не загружал данные правильно при использовании пользовательских схема?

Большое спасибо.

Ответы [ 2 ]

1 голос
/ 30 января 2020

Это столбцы с форматированием даты, которые убивают ее. Вместо этого установите для них StringType, а затем исправьте их после того, как у вас будет кадр данных. Он не сможет проанализировать ваши даты без дополнительной информации.

from pyspark.sql.types import *
origSchema = StructType([
                         StructField("loan_id", StringType(), True),
                         StructField("origination_channel", StringType(), True),
                         StructField("seller_name", StringType(), True),
                         StructField("original_interest_rate", DoubleType(), True),
                         StructField("original_upb", DoubleType(), True),
                         StructField("original_loan_term", IntegerType(), True),
                         StructField("origination_date", StringType(), True),
                         StructField("first_payment_date", StringType(), True),
                         StructField("original_ltv", IntegerType(), True),
                         StructField("original_cltv", IntegerType(), True),
                         StructField("number_of_borrowers", IntegerType(), True),
                         StructField("original_dti", IntegerType(), True),
                         StructField("borrower_fico_at_origination", IntegerType(), True),
                         StructField("first_time_home_buyer_indicator", StringType(), True),
                         StructField("loan_purpose", StringType(), True),
                         StructField("property_type", StringType(), True),
                         StructField("number_of_units", StringType(), True),
                         StructField("occupancy_type", StringType(), True),
                         StructField("property_state", StringType(), True),
                         StructField("zip_code_short", StringType(), True),
                         StructField("primary_mortgage_insurance_percent", IntegerType(), True),
                         StructField("product_type", StringType(), True),
                         StructField("coborrower_fico_at_origination", IntegerType(), True),
                         StructField("mortgage_insurance_type", StringType(), True),
                         StructField("relocation_mortgage_indicator", StringType(), True)
])

df = spark.read.option("inferSchema", True).option("sep", "|").format("csv").load("/path/to/data/name_of_file.txt")
0 голосов
/ 30 января 2020

Когда схема применяется принудительно, spark возвращает нулевые значения для записей, которые не основаны на данной схеме.

В spark csv есть 3 режима анализа файлов:

По умолчанию это PERMISSIVE. Возможные значения:

  1. PERMISSIVE: попытаться проанализировать все строки: для отсутствующих токенов вставляются нули, а дополнительные токены игнорируются. Если запись искажена, тогда для всей записи будут установлены нулевые значения. например, несоответствие типа или формата данных.

  2. DROPMALFORMED: отбросить строки, которые содержат меньше или больше токенов, чем ожидалось, или токены, которые не соответствуют схеме.

  3. FAILFAST: прервать с RuntimeException, если обнаружена какая-либо искаженная строка.

Вы не устанавливаете никакой режим, поэтому получаете поведение по умолчанию.

Если вы заинтересованы в правильном отображении имен столбцов, сделайте все столбцы в схеме для StringType.

Как только вы увидите, что данные выглядят корректно, отформатируйте их (например, столбцы даты) и приведите их при необходимости.

...