Я новичок в DataBricks. Моя задача состоит в том, чтобы прочитать несколько больших файлов CSV (размером до 1 гигабайта), проверить и очистить все поля, готовые для считывания многоосновной базы в Azure DW. Файлы хранятся в blob.
Я думал, что DatBricks и Python будут подходом, который даст ощутимую производительность.
Я использовал пример QuickStart, показанный ниже, в качестве отправной точки: https://docs.microsoft.com/en-us/azure/azure-databricks/quickstart-create-databricks-workspace-portal
Я хочу выполнить несколько чистящих замен для каждого поля, а также запустить Regex, чтобы отфильтровать любые другие нежелательные символы и, наконец, обрезать, чтобы удалить завершающие пробелы. Ниже приведен фрагмент тестового примера, который дает представление о типе проверки, которую я буду выполнять sh. В этом примере используется udf для перевода значения, а затем регулярное выражение для фильтрации нежелательных символов, в примере, показанном в ссылке.
import pyspark.sql.functions as f
def udf_clean (s):
return (f.translate(s,'3','B'))
df.filter(df.category=='Housing').select(df[1],f.trim(f.regexp_replace(udf_clean(df[1]),'(\d+)',''))).show()
Я не могу выяснить, как я могу выполнить эти переводы в целом dataframe. Я хотел бы очистить весь фрейм данных за один проход. Так как он основан на векторах, я чувствую, что не нужно перебирать его по очереди, а просто выполнять какие-то операции в целом. Я понимаю, как выполнить итерацию строки, как в
`for row in df.rdd.collect():
do_something(row)`
.., но я чувствую, что должен быть в состоянии сделать что-то более эффективно для всего набора полей. Правильно ли это мышление, и есть ли у кого-нибудь примеры, пожалуйста? Большое спасибо, Ричард
Результирующий код, но не ответ
Я не нашел ответа на этот вопрос, но я хотел бы опубликовать свой код, который, как вы увидите, не элегантен, но работает .
from pyspark.sql import functions as f
from pyspark.sql.functions import regexp_replace, udffrom pyspark.sql.functions import translate, udf
from pyspark.sql.functions import trim, udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
def udf_regexclean(s):
return trim(regexp_replace(s,'([^\p{L}\p{Nd} ''@.():_*\-&+\/,])',''))
def udf_regexReplace(s):
return regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(s,'£','GBR'),'’',''),' ',''),"'t",''),'É', 'E')
df1=df.select( udf_regexclean(udf_regexReplace(df[0 ]))
,udf_regexclean(udf_regexReplace(df[1 ]))
,udf_regexclean(udf_regexReplace(df[2 ]))
,udf_regexclean(udf_regexReplace(df[3 ]))
,udf_regexclean(udf_regexReplace(df[4 ]))
,udf_regexclean(udf_regexReplace(df[5 ]))
,udf_regexclean(udf_regexReplace(df[6 ]))
,udf_regexclean(udf_regexReplace(df[7 ]))
,udf_regexclean(udf_regexReplace(df[8 ]))
,udf_regexclean(udf_regexReplace(df[9 ]))
,udf_regexclean(udf_regexReplace(df[10 ]))
,udf_regexclean(udf_regexReplace(df[11 ]))
,udf_regexclean(udf_regexReplace(df[12 ]))
,udf_regexclean(udf_regexReplace(df[13 ]))
,udf_regexclean(udf_regexReplace(df[14 ]))
,udf_regexclean(udf_regexReplace(df[15 ]))
,udf_regexclean(udf_regexReplace(df[16 ]))
,udf_regexclean(udf_regexReplace(df[17 ]))
,udf_regexclean(udf_regexReplace(df[18 ]))
,udf_regexclean(udf_regexReplace(df[19 ]))
,udf_regexclean(udf_regexReplace(df[20 ]))
,udf_regexclean(udf_regexReplace(df[21 ]))
,udf_regexclean(udf_regexReplace(df[22 ]))
,udf_regexclean(udf_regexReplace(df[23 ]))
,udf_regexclean(udf_regexReplace(df[24 ]))
,udf_regexclean(udf_regexReplace(df[25 ]))
,udf_regexclean(udf_regexReplace(df[26 ]))
,udf_regexclean(udf_regexReplace(df[27 ]))
,udf_regexclean(udf_regexReplace(df[28 ]))
,udf_regexclean(udf_regexReplace(df[29 ]))
,udf_regexclean(udf_regexReplace(df[30 ]))
,udf_regexclean(udf_regexReplace(df[31 ]))
,udf_regexclean(udf_regexReplace(df[32 ]))
)
df2=df1.withColumn('ScrapedFilename',lit(blob_filename))
Ричард