используя pyspark, как отклонить неверные (искаженные) записи из CSV-файла и сохранить эти отклоненные записи в новом файле - PullRequest
0 голосов
/ 15 января 2019

Я использую pyspark для загрузки данных из файла CSV в кадр данных, и мне удалось загрузить данные при удалении искаженных записей, но как я могу отклонить эти плохие (искаженные) записи из файла CSV и сохранить эти отклоненные записи в новый файл?

1 Ответ

0 голосов
/ 15 января 2019

Вот идея, хотя я не очень рад этому. Как вы знаете, синтаксический анализатор CSV имеет различные режимы для сброса искаженных данных. Однако, если режим не указан, он «заполняет пробелы» значением по умолчанию null. Вы можете использовать это в своих интересах.

Используя эти данные и предполагая, что столбец article_id не может быть обнуляемым по проекту:

1,abcd,correct record1,description1 haha
Bad record,Bad record description
3,hijk,another correct record,description2
Not_An_Integer,article,no integer type,description

Вот код:

#!/usr/bin/env python
# coding: utf-8

import pyspark
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F

sc = pyspark.SparkContext.getOrCreate()
spark = pyspark.sql.SparkSession(sc)

# Load the data with your schema, drop the malformed information
schema = StructType([ StructField("article_id", IntegerType()), 
                     StructField("title", StringType()), 
                     StructField("short_desc", StringType()), 
                     StructField("article_desc", StringType())]) 
valid_data = spark.read.format("csv").schema(schema).option("mode","DROPMALFORMED").load("./data.csv")
valid_data.show()

"""
+----------+-----+--------------------+-----------------+
|article_id|title|          short_desc|     article_desc|
+----------+-----+--------------------+-----------------+
|         1| abcd|     correct record1|description1 haha|
|         3| hijk|another correct r...|     description2|
+----------+-----+--------------------+-----------------+
"""

# Load the data and let spark infer everything
malformed_data = spark.read.format("csv").option("header", "false").load("./data.csv")
malformed_data.show()

"""
+--------------+--------------------+--------------------+-----------------+
|           _c0|                 _c1|                 _c2|              _c3|
+--------------+--------------------+--------------------+-----------------+
|             1|                abcd|     correct record1|description1 haha|
|    Bad record|Bad record descri...|                null|             null|
|             3|                hijk|another correct r...|     description2|
|Not_An_Integer|             article|     no integer type|      description|
+--------------+--------------------+--------------------+-----------------+
"""

# Join and keep all data from the 'malformed' DataFrame.
merged = valid_data.join(malformed_data, on=valid_data.article_id == malformed_data._c0, how="right")

# Filter those records for which a matching with the 'valid' data was not possible
malformed = merged.where(F.isnull(merged.article_id))
malformed.show()

"""
+----------+-----+----------+------------+--------------+--------------------+---------------+-----------+
|article_id|title|short_desc|article_desc|           _c0|                 _c1|            _c2|        _c3|
+----------+-----+----------+------------+--------------+--------------------+---------------+-----------+
|      null| null|      null|        null|    Bad record|Bad record descri...|           null|       null|
|      null| null|      null|        null|Not_An_Integer|             article|no integer type|description|
+----------+-----+----------+------------+--------------+--------------------+---------------+-----------+
"""

Мне это не очень нравится, так как он очень чувствителен к тому, как Spark анализирует CSV, и он может работать не для всех файлов, но вы можете найти его полезным.

...