AZure DataBricks - проверка полей большого файла CSV - PullRequest
1 голос
/ 18 января 2020

Я новичок в DataBricks. Моя задача состоит в том, чтобы прочитать несколько больших файлов CSV (размером до 1 гигабайта), проверить и очистить все поля, готовые для считывания многоосновной базы в Azure DW. Файлы хранятся в blob.

Я думал, что DatBricks и Python будут подходом, который даст ощутимую производительность.

Я использовал пример QuickStart, показанный ниже, в качестве отправной точки: https://docs.microsoft.com/en-us/azure/azure-databricks/quickstart-create-databricks-workspace-portal

Я хочу выполнить несколько чистящих замен для каждого поля, а также запустить Regex, чтобы отфильтровать любые другие нежелательные символы и, наконец, обрезать, чтобы удалить завершающие пробелы. Ниже приведен фрагмент тестового примера, который дает представление о типе проверки, которую я буду выполнять sh. В этом примере используется udf для перевода значения, а затем регулярное выражение для фильтрации нежелательных символов, в примере, показанном в ссылке.

import pyspark.sql.functions as f

def udf_clean (s):
  return (f.translate(s,'3','B'))

df.filter(df.category=='Housing').select(df[1],f.trim(f.regexp_replace(udf_clean(df[1]),'(\d+)',''))).show()

Я не могу выяснить, как я могу выполнить эти переводы в целом dataframe. Я хотел бы очистить весь фрейм данных за один проход. Так как он основан на векторах, я чувствую, что не нужно перебирать его по очереди, а просто выполнять какие-то операции в целом. Я понимаю, как выполнить итерацию строки, как в

`for row in df.rdd.collect():
     do_something(row)`

.., но я чувствую, что должен быть в состоянии сделать что-то более эффективно для всего набора полей. Правильно ли это мышление, и есть ли у кого-нибудь примеры, пожалуйста? Большое спасибо, Ричард

Результирующий код, но не ответ

Я не нашел ответа на этот вопрос, но я хотел бы опубликовать свой код, который, как вы увидите, не элегантен, но работает .

from pyspark.sql import functions as f
from pyspark.sql.functions import regexp_replace, udffrom pyspark.sql.functions import translate, udf

from pyspark.sql.functions import trim, udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction


def udf_regexclean(s):
  return trim(regexp_replace(s,'([^\p{L}\p{Nd} ''@.():_*\-&+\/,])',''))

def udf_regexReplace(s):
  return regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(s,'£','GBR'),'’',''),'  ',''),"'t",''),'É', 'E')


df1=df.select( udf_regexclean(udf_regexReplace(df[0 ]))
,udf_regexclean(udf_regexReplace(df[1   ]))
,udf_regexclean(udf_regexReplace(df[2   ]))
,udf_regexclean(udf_regexReplace(df[3   ]))
,udf_regexclean(udf_regexReplace(df[4   ]))
,udf_regexclean(udf_regexReplace(df[5   ]))
,udf_regexclean(udf_regexReplace(df[6   ]))
,udf_regexclean(udf_regexReplace(df[7   ]))
,udf_regexclean(udf_regexReplace(df[8   ]))
,udf_regexclean(udf_regexReplace(df[9   ]))
,udf_regexclean(udf_regexReplace(df[10  ]))
,udf_regexclean(udf_regexReplace(df[11  ]))
,udf_regexclean(udf_regexReplace(df[12  ]))
,udf_regexclean(udf_regexReplace(df[13  ]))
,udf_regexclean(udf_regexReplace(df[14  ]))
,udf_regexclean(udf_regexReplace(df[15  ]))
,udf_regexclean(udf_regexReplace(df[16  ]))
,udf_regexclean(udf_regexReplace(df[17  ]))
,udf_regexclean(udf_regexReplace(df[18  ]))
,udf_regexclean(udf_regexReplace(df[19  ]))
,udf_regexclean(udf_regexReplace(df[20  ]))
,udf_regexclean(udf_regexReplace(df[21  ]))
,udf_regexclean(udf_regexReplace(df[22  ]))
,udf_regexclean(udf_regexReplace(df[23  ]))
,udf_regexclean(udf_regexReplace(df[24  ]))
,udf_regexclean(udf_regexReplace(df[25  ]))
,udf_regexclean(udf_regexReplace(df[26  ]))
,udf_regexclean(udf_regexReplace(df[27  ]))
,udf_regexclean(udf_regexReplace(df[28  ]))
,udf_regexclean(udf_regexReplace(df[29  ]))
,udf_regexclean(udf_regexReplace(df[30  ]))
,udf_regexclean(udf_regexReplace(df[31  ]))
,udf_regexclean(udf_regexReplace(df[32  ]))
             )
df2=df1.withColumn('ScrapedFilename',lit(blob_filename))

Ричард

1 Ответ

1 голос
/ 20 января 2020

В качестве примера я создал простой пример для реализации ваших потребностей, а не для замены, просто для создания нового кадра данных с помощью применения функции UDF к RDD старого блока данных.

Сначала я создал простой фрейм данных в виде кода и рисунка ниже.

import numpy as np
import pandas as pd
dates = pd.date_range('20130101', periods=6)
df = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
sparkDF=spark.createDataFrame(df)
display(sparkDF)

enter image description here

Затем, чтобы определить функцию для каждой строки, как код и рисунок ниже.

def udf_clean(row):
  return (row[0] > 0 and True or False, row[1]+2, row[2]*2, row[3]**4)
new_rdd = sparkDF.rdd.map(lambda row: udf_clean(row))
new_sparkDF = spark.createDataFrame(new_rdd, list('ABCD'))
display(new_sparkDF)

enter image description here

Надеюсь, это поможет.

...