Разработка приложения для проверки качества данных в Python - PullRequest
1 голос
/ 09 июля 2019

Я занимаюсь разработкой приложения, которое выполняет проверки качества данных для входных файлов и регистрирует их на основе сообщений о сбоях DQ в данных. Имеет ли смысл подход, который я использую, или порекомендовал бы лучший способ сделать это?

Я пытаюсь написать приложение на Python, которое будет фиксировать ошибки DQ в данных и собирать данные. Я мог бы использовать Pandas, Numpy для этого, однако, поскольку объем данных огромен ~ 100 ГБ, я решил сделать это через Spark. Это мое третье приложение на Python, поэтому, хотя я могу писать в нем код, я не в себе, если это действительно оптимальный способ сделать это.

Итак, чтобы подвести итог, я читаю несколько CSV-файлов и создаю один файл Parquet поверх него, а затем создаю временное представление, к которому я могу обратиться, чтобы найти проблемы с DQ. Затем я фиксирую результаты запроса в переменной, а затем записываю его в список. Этот список позже используется для записи CSV, который становится входом для отчета панели мониторинга. Код ниже.

# Importing required libraries
import time,datetime
from pyspark.sql import SparkSession

# Initiating Spark Session
spark = SparkSession.builder.appName("DQ-checks").getOrCreate()

# Initializing Variables
time1 = datetime.datetime.now()
src_file_01 = r'\All kinds of data files\craigslistVehicles.csv'
target_filename = r'\All kinds of data files\craigslistVehicles.parquet'

# Read the CSV file through Spark
df = spark.read.csv(src_file_01, header="true", inferSchema="true")

# Update code to make it flexible enough to read multiple files

# Write the contents of the CSV file into a Parquet file
df.write.format("parquet").save(target_filename, mode="Overwrite")
print("created a parquet file")

# Create a temporary view over the Parquet file to query data
df2 = spark.read.parquet(target_filename)
df2.createOrReplaceTempView("craigslistVehicles")

# Create a column list from the header of the Spark View
column_list = df2.columns
print(column_list)

# Capturing time before start of the query for benchmarking
time2 = datetime.datetime.now()
result_store = []
# Iterate through all the columns and capture null counts for each column
rule_type = 'Null Check'
results={}
for column_names in column_list:
    query = "Select count(*) from craigslistVehicles where {} is null".format(column_names)
#    print(query)
    df3 = spark.sql(query).collect()
    for i in df3:
        results.update(i.asDict())
        res_in_num=results['count(1)']
    result_store=[rule_type,column_names,res_in_num]
    print (result_store)

# Next Steps - Update code to add more data quality checks based on requirement.


# Next Steps - Insert results of the queries into a spark table that can be used as a log and becomes an input for a dashboard report.


# Capturing time after end of the query for benchmarking
time3 = datetime.datetime.now()
print("Query time is.{}",time3-time2)
print("Total Job run time is.{}",time3-time1)

# Spark Session Stop
spark.stop()

В настоящее время это работает. Я могу обработать файл размером 1,1 ГБ менее чем за минуту.

Мои вопросы -

Имеет ли этот дизайн смысл? Как бы вы это сделали, если бы вам пришлось это сделать? Есть ли что-то очевидное, что я могу изменить, чтобы сделать код более чистым?

...