У меня есть повторяющиеся строки, которые могут содержать те же данные или имеющие пропущенные значения во фрейме данных PySpark.Код, который я написал, очень медленный и не работает как распределенная система.Кто-нибудь знает, как сохранить отдельные уникальные значения из дублированных строк в кадре данных PySpark, который может работать как распределенная система и с быстрым временем обработки?
Я написал полный код Pyspark, и этот код работает правильно.Но время обработки действительно медленное, и его невозможно использовать в кластере Spark.
'' '
# Columns of duplicate Rows of DF
dup_columns = df.columns
for row_value in df_duplicates.rdd.toLocalIterator():
print(row_value)
# Match duplicates using std name and create RDD
fill_duplicated_rdd = ((df.where((sf.col("stdname") == row_value['stdname'] ))
.where(sf.col("stdaddress")== row_value['stdaddress']))
.rdd.map(fill_duplicates))
# Creating feature names for the same RDD
fill_duplicated_rdd_col_names = (((df.where((sf.col("stdname") == row_value['stdname']) &
(sf.col("stdaddress")== row_value['stdaddress'])))
.rdd.map(fill_duplicated_columns_extract)).first())
# Creating DF using the previous RDD
# This DF stores value of a single set of matching duplicate rows
df_streamline = fill_duplicated_rdd.toDF(fill_duplicated_rdd_col_names)
for column in df_streamline.columns:
try:
col_value = ([str(value[column]) for value in
df_streamline.select(col(column)).distinct().rdd.toLocalIterator() if value[column] != ""])
if len(col_value) >= 1:
# non null or empty value of a column store here
# This value is a no duplicate distinct value
col_value = col_value[0]
#print(col_value)
# The non-duplicate distinct value of the column is stored back to
# replace any rows in the PySpark DF that were empty.
df_dedup = (df_dedup
.withColumn(column,sf.when((sf.col("stdname") == row_value['stdname'])
& (sf.col("stdaddress")== row_value['stdaddress'])
,col_value)
.otherwise(df_dedup[column])))
#print(col_value)
except:
print("None")
' ''
Ошибок нетсообщения, но код работает очень медленно.Мне нужно решение, которое заполняет строки с уникальными значениями в PySpark DF, которые являются пустыми.Может заполнять строки четным режимом значения