Найти количество разных значений между двумя одинаковыми значениями в CSV-файле, используя pyspark - PullRequest
2 голосов
/ 19 апреля 2019

Я работаю над pyspark для работы с большими CSV-файлами размером более 50 ГБ. Теперь мне нужно найти количество различных значений между двумя ссылками на одно и то же значение. например,

input dataframe:
+----+
|col1|
+----+
|   a|
|   b|
|   c|
|   c| 
|   a|   
|   b|
|   a|     
+----+


output dataframe:
+----+-----+
|col1|col2 |
+----+-----+
|   a| null|
|   b| null|
|   c| null|
|   c|    0| 
|   a|    2|
|   b|    2|   
|   a|    1| 
+----+-----+

Я борюсь с этим в течение прошлой недели. Пробовал оконные функции и многое в искре. Но ничего не мог получить. Было бы здорово помочь, если кто-то знает, как это исправить. Спасибо.

Прокомментируйте, если вам нужны какие-либо разъяснения в вопросе.

Ответы [ 3 ]

0 голосов
/ 20 апреля 2019

Вы можете попробовать следующий подход:

  1. добавить монотонно увеличивающийся столбец id, чтобы отслеживать порядок строк
  2. найти prev_id для каждого col1 и сохранитьрезультат для нового df
  3. для нового DF (псевдоним 'd1'), сделать LEFT JOIN для самого DF (псевдоним 'd2') с условием (d2.id > d1.prev_id) & (d2.id < d1.id)
  4. затем groupby ('d1.col1', 'd1.id') и агрегирование по countDistinct ('d2.col1')

Код, основанный на приведенной выше логике, и ваши примеры данных показаны ниже:

from pyspark.sql import functions as F, Window

df1 = spark.createDataFrame([ (i,) for i in list("abccaba")], ["col1"])

# create a WinSpec partitioned by col1 so that we can find the prev_id
win = Window.partitionBy('col1').orderBy('id')

# set up id and prev_id
df11 = df1.withColumn('id', F.monotonically_increasing_id())\
          .withColumn('prev_id', F.lag('id').over(win))

# check the newly added columns
df11.sort('id').show()
# +----+---+-------+
# |col1| id|prev_id|
# +----+---+-------+
# |   a|  0|   null|
# |   b|  1|   null|
# |   c|  2|   null|
# |   c|  3|      2|
# |   a|  4|      0|
# |   b|  5|      1|
# |   a|  6|      4|
# +----+---+-------+

# let's cache the new dataframe
df11.persist()

# do a self-join on id and prev_id and then do the aggregation
df12 = df11.alias('d1') \
           .join(df11.alias('d2')
               , (F.col('d2.id') > F.col('d1.prev_id')) & (F.col('d2.id') < F.col('d1.id')), how='left') \
           .select('d1.col1', 'd1.id', F.col('d2.col1').alias('ids')) \
           .groupBy('col1','id') \
           .agg(F.countDistinct('ids').alias('distinct_values'))

# display the result
df12.sort('id').show()
# +----+---+---------------+
# |col1| id|distinct_values|
# +----+---+---------------+
# |   a|  0|              0|
# |   b|  1|              0|
# |   c|  2|              0|
# |   c|  3|              0|
# |   a|  4|              2|
# |   b|  5|              2| 
# |   a|  6|              1|
# +----+---+---------------+

# release the cached df11
df11.unpersist()

Примечание вам нужно будет сохранить этот столбец id для сортировки строк, иначе ваши результирующие строки будут полностью испорчены каждый раз, когда вы их собираете.

0 голосов
/ 20 апреля 2019
reuse_distance = []

block_dict = {}
stack_dict = {}
counter_reuse = 0
counter_stack = 0
reuse_list = []

Здесь блок - это не что иное, как символы, которые вы хотите прочитать и найти из csv

stack_list = []
        stack_dist = -1
        reuse_dist = -1
        if block in block_dict:
            reuse_dist = counter_reuse - block_dict[block]-1
            block_dict[block] = counter_reuse
            counter_reuse += 1
            stack_dist_ind= stack_list.index(block)

            stack_dist = counter_stack -stack_dist_ind - 1
            del stack_list[stack_dist_ind]
            stack_list.append(block)

        else:
            block_dict[block] = counter_reuse
            counter_reuse += 1
            counter_stack += 1
            stack_list.append(block)

        reuse_distance_2.append([block, stack_dist, reuse_dist])
0 голосов
/ 19 апреля 2019

Я предоставляю решение с некоторыми допущениями.

Предполагается, что предыдущая ссылка может быть найдена в максимуме предыдущих 'n' строк. Если 'n' - разумное меньшее значение, я думаю, что это хорошее решение.

Я предположил, что вы можете найти предыдущую ссылку в 5 строках.

def get_distincts(list, current_value):
    cnt = {}
    flag = False
    for i in list:
        if current_value == i :
            flag = True
            break
        else:
            cnt[i] = "some_value"

    if flag:
        return len(cnt)
    else:
        return None

get_distincts_udf = udf(get_distincts, IntegerType())

df = spark.createDataFrame([["a"],["b"],["c"],["c"],["a"],["b"],["a"]]).toDF("col1")
#You can replace this, if you have some unique id column 
df = df.withColumn("seq_id", monotonically_increasing_id()) 

window = Window.orderBy("seq_id")
df = df.withColumn("list", array([lag(col("col1"),i, None).over(window) for i in range(1,6) ]))

df = df.withColumn("col2", get_distincts_udf(col('list'), col('col1'))).drop('seq_id','list')
df.show()

что дает

+----+----+
|col1|col2|
+----+----+
|   a|null|
|   b|null|
|   c|null|
|   c|   0|
|   a|   2|
|   b|   2|
|   a|   1|
+----+----+
...