У меня есть проблема, когда я сгенерировал фрейм данных из алгоритма графа, который я написал. Дело в том, что я хочу, чтобы значение базового компонента оставалось одинаковым по существу после каждого запуска кода графа.
Это образец сгенерированного кадра данных:
df = spark.createDataFrame(
[
(1, 'A1'),
(1, 'A2'),
(1, 'A3'),
(2, 'B1'),
(2, 'B2'),
(3, 'B3'),
(4, 'C1'),
(4, 'C2'),
(4, 'C3'),
(4, 'C4'),
(5, 'D1'),
],
['old_comp_id', 'db_id']
)
После другой прогон, значения полностью изменяются, поэтому новый прогон имеет следующие значения:
df2 = spark.createDataFrame(
[
(2, 'A1'),
(2, 'A2'),
(2, 'A3'),
(3, 'B1'),
(3, 'B2'),
(3, 'B3'),
(1, 'C1'),
(1, 'C2'),
(1, 'C3'),
(1, 'C4'),
(4, 'D1'),
],
['new_comp_id', 'db_id']
)
Поэтому мне нужно сравнить значения между двумя вышеупомянутыми фреймами данных и изменить значения идентификатора компонента. основанный на связанном идентификаторе базы данных.
- , если database_id одинаковы, тогда обновите идентификатор компонента, чтобы быть от 1-го кадра данных
- , если они отличаются, тогда назначьте совершенно новый comp_id ( new_comp_id = max (old_comp_id) +1)
Это то, что я придумал до сих пор:
old_ids = df.groupBy("old_comp_id").agg(F.collect_set(F.col("db_id")).alias("old_db_id"))
new_ids = df2.groupBy("new_comp_id").agg(F.collect_set(F.col("db_id")).alias("new_db_id"))
joined = new_ids.join(old_ids,old_ids.old_comp_id == new_ids.new_comp_id,"outer")
joined.withColumn("update_comp", F.when( F.col("new_db_id") == F.col("old_db_id"), F.col('old_comp_id')).otherwise(F.max(F.col("old_comp_id")+1))).show()