Извлечение и замена значений из дублирующихся строк в фрейме данных PySpark - PullRequest
0 голосов
/ 21 июня 2019

У меня есть повторяющиеся строки, которые могут содержать те же данные или имеющие пропущенные значения во фрейме данных PySpark.Код, который я написал, очень медленный и не работает как распределенная система.Кто-нибудь знает, как сохранить отдельные уникальные значения из дублированных строк в кадре данных PySpark, который может работать как распределенная система и с быстрым временем обработки?

Я написал полный код Pyspark, и этот код работает правильно.Но время обработки действительно медленное, и его невозможно использовать в кластере Spark.

'' '

# Columns of duplicate Rows of DF

dup_columns = df.columns

for row_value in df_duplicates.rdd.toLocalIterator():
    print(row_value)

# Match duplicates using std name and create RDD

    fill_duplicated_rdd = ((df.where((sf.col("stdname") == row_value['stdname'] ))
                        .where(sf.col("stdaddress")== row_value['stdaddress']))
                   .rdd.map(fill_duplicates))

    # Creating feature names for the same RDD 

    fill_duplicated_rdd_col_names = (((df.where((sf.col("stdname") == row_value['stdname']) &
                                    (sf.col("stdaddress")== row_value['stdaddress'])))
                   .rdd.map(fill_duplicated_columns_extract)).first())

    # Creating DF using the previous RDD
    # This DF stores value of a single set of matching duplicate rows

    df_streamline = fill_duplicated_rdd.toDF(fill_duplicated_rdd_col_names)

    for column in df_streamline.columns:
        try:

            col_value = ([str(value[column]) for value in 
                      df_streamline.select(col(column)).distinct().rdd.toLocalIterator() if value[column] != ""])        

            if len(col_value) >= 1:
                # non null or empty value of a column store here
                # This value is a no duplicate distinct value
                col_value = col_value[0]
                #print(col_value)

                # The non-duplicate distinct value of the column is stored back to 
                # replace any rows in the PySpark DF that were empty.

                df_dedup = (df_dedup
              .withColumn(column,sf.when((sf.col("stdname") == row_value['stdname'])
                                      & (sf.col("stdaddress")== row_value['stdaddress'])
                                         ,col_value)
                          .otherwise(df_dedup[column])))

            #print(col_value)
        except:
            print("None")

' ''

Ошибок нетсообщения, но код работает очень медленно.Мне нужно решение, которое заполняет строки с уникальными значениями в PySpark DF, которые являются пустыми.Может заполнять строки четным режимом значения

1 Ответ

0 голосов
/ 21 июня 2019

"" "

df_streamline = fill_duplicated_rdd.toDF(fill_duplicated_rdd_col_names)

    for column in df_streamline.columns:
        try:

           # distinct() was replaced by isNOTNULL().limit(1).take(1) to improve the speed of the code and extract values of the row.

           col_value  = df_streamline.select(column).where(sf.col(column).isNotNull()).limit(1).take(1)[0][column]

            df_dedup = (df_dedup
              .withColumn(column,sf.when((sf.col("stdname") == row_value['stdname'])
                                         & (sf.col("stdaddress")== row_value['stdaddress'])
                                         ,col_value)
                          .otherwise(df_dedup[column])))

" ""

...