Сравните 2 столбца данных pyspark и измените значения другого столбца на его основе. - PullRequest
0 голосов
/ 15 января 2020

У меня есть проблема, когда я сгенерировал фрейм данных из алгоритма графа, который я написал. Дело в том, что я хочу, чтобы значение базового компонента оставалось одинаковым по существу после каждого запуска кода графа.

Это образец сгенерированного кадра данных:

df = spark.createDataFrame(
    [
        (1, 'A1'), 
        (1, 'A2'),
        (1, 'A3'),
        (2, 'B1'),
        (2, 'B2'),
        (3, 'B3'),
        (4, 'C1'),
        (4, 'C2'),
        (4, 'C3'),
        (4, 'C4'),
        (5, 'D1'),
    ],
    ['old_comp_id', 'db_id'] 
)

После другой прогон, значения полностью изменяются, поэтому новый прогон имеет следующие значения:

df2 = spark.createDataFrame(
    [
        (2, 'A1'), 
        (2, 'A2'),
        (2, 'A3'),
        (3, 'B1'),
        (3, 'B2'),
        (3, 'B3'),
        (1, 'C1'),
        (1, 'C2'),
        (1, 'C3'),
        (1, 'C4'),
        (4, 'D1'),
    ],
    ['new_comp_id', 'db_id'] 
)

Поэтому мне нужно сравнить значения между двумя вышеупомянутыми фреймами данных и изменить значения идентификатора компонента. основанный на связанном идентификаторе базы данных.

  1. , если database_id одинаковы, тогда обновите идентификатор компонента, чтобы быть от 1-го кадра данных
  2. , если они отличаются, тогда назначьте совершенно новый comp_id ( new_comp_id = max (old_comp_id) +1)

Это то, что я придумал до сих пор:

old_ids = df.groupBy("old_comp_id").agg(F.collect_set(F.col("db_id")).alias("old_db_id"))
new_ids = df2.groupBy("new_comp_id").agg(F.collect_set(F.col("db_id")).alias("new_db_id"))

joined = new_ids.join(old_ids,old_ids.old_comp_id == new_ids.new_comp_id,"outer")

joined.withColumn("update_comp", F.when( F.col("new_db_id") == F.col("old_db_id"), F.col('old_comp_id')).otherwise(F.max(F.col("old_comp_id")+1))).show()

1 Ответ

0 голосов
/ 15 января 2020

Чтобы использовать агрегированные функции в неагрегированных столбцах, вы должны использовать Windowing Functions.

Сначала вы внешне объединяете DF с помощью db_id:

from pyspark.sql.functions import when, col, max
joinedDF = df.join(df2, df["db_id"] == df2["new_db_id"], "outer")

Затем, начните строить окно (в котором вы группируете по db_id и упорядочиваете по old_comp_id, чтобы иметь в первых строках old_comp_id с наибольшим значением.

from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
.partitionBy("db_id")\
.orderBy(desc("old_comp_id"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

Затем вы строите столбец max с помощью windowSpe c

from pyspark.sql.functions import max
maxCompId = max(col("old_comp_id")).over(windowSpec)

Затем вы применяете его для выбора

joinedDF.select(col("db_id"), when(col("new_db_id").isNotNull(), col("old_comp_id")).otherwise(maxCompId+1).alias("updated_comp")).show()

Для получения дополнительной информации, пожалуйста, обратитесь к документации (http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark. sql. Окно )

Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...