Я создаю небольшой примерный фрейм данных на основе ваших объяснений:
from pyspark.sql import functions as F, types as T
df = spark.createDataFrame(
[
(1, "url_1", [0.3,0.6,], [2,3]),
(2, "url_2", [0.3,0.5,], [1,3]),
(3, "url_3", [0.6,0.5,], [1,2]),
],
["id", "url", "vec", "similar_url"]
)
df.show()
+---+-----+----------+-----------+
| id| url| vec|similar_url|
+---+-----+----------+-----------+
| 1|url_1|[0.3, 0.6]| [2, 3]|
| 2|url_2|[0.3, 0.5]| [1, 3]|
| 3|url_3|[0.6, 0.5]| [1, 2]|
+---+-----+----------+-----------+
Если вы используете версию> 2.4 spark, есть функция с именем "arrays_zip", которую вы можете использовать для замены моего UDF:
outType = T.ArrayType(
T.StructType([
T.StructField("vec",T.FloatType(), True),
T.StructField("similar_url",T.IntegerType(), True),
]))
@F.udf(outType)
def arrays_zip(vec, similar_url):
return zip(vec, similar_url)
тогда вы можете обработать ваши данные:
df.withColumn(
"zips",
arrays_zip(F.col("vec"), F.col("similar_url"))
).withColumn(
"zip",
F.explode("zips")
).alias("df").join(
df.alias("df_2"),
F.col("df_2.id") == F.col("df.zip.similar_url")
).groupBy("df.id", "df.url").agg(
F.collect_list("df.zip.vec").alias("vec"),
F.collect_list("df_2.url").alias("similar_url"),
).show()
+---+-----+----------+--------------+
| id| url| vec| similar_url|
+---+-----+----------+--------------+
| 3|url_3|[0.6, 0.5]|[url_1, url_2]|
| 2|url_2|[0.3, 0.5]|[url_1, url_3]|
| 1|url_1|[0.6, 0.3]|[url_3, url_2]|
+---+-----+----------+--------------+
Если вы хотите сохранить порядок, вам нужно немного больше манипулировать:
@F.udf(T.ArrayType(T.FloatType()))
def get_vec(new_list):
new_list.sort(key=lambda x : x[0])
out_list = [x[1] for x in new_list]
return out_list
@F.udf(T.ArrayType(T.StringType()))
def get_similar_url(new_list):
new_list.sort(key=lambda x : x[0])
out_list = [x[2] for x in new_list]
return out_list
df.withColumn(
"zips",
arrays_zip(F.col("vec"), F.col("similar_url"))
).select(
"id",
"url",
F.posexplode("zips")
).alias("df").join(
df.alias("df_2"),
F.col("df_2.id") == F.col("df.col.similar_url")
).select(
"df.id",
"df.url",
F.struct(
F.col("df.pos").alias("pos"),
F.col("df.col.vec").alias("vec"),
F.col("df_2.url").alias("similar_url"),
).alias("new_struct")
).groupBy(
"id",
"url"
).agg(
F.collect_list("new_struct").alias("new_list")
).select(
"id",
"url",
get_vec(F.col("new_list")).alias("vec"),
get_similar_url(F.col("new_list")).alias("similar_url"),
).show()