пожалуйста, опишите, какие логики c вы хотите попробовать. Цикл DF может быть выполнен с помощью подхода SQL, или вы также можете следовать нижеприведенному подходу RDD
def my_function(each_record):
#my_logic
#loop through for each command.
df.rdd.foreach(my_function)
Добавлен следующий код, основанный на вашем вводе
df = spark.read.csv("/mylocation/61250775.csv", header=True, inferSchema=True, sep="|")
seq = ['product X','product Y','product Z']
df2 = df.groupBy("productid").pivot("similar_product",seq).count()
+---------+---------+---------+---------+
|productid|product X|product Y|product Z|
+---------+---------+---------+---------+
|product B| 1| null| null|
|product A| 1| 1| null|
|product C| null| null| 1|
+---------+---------+---------+---------+
Окончательный подход, который соответствует вашему требование
df = spark.read.csv ("/ mylocation / 61250775.csv", header = True, inferSchema = True, sep = "|") df.printSchema ()
>>> df.printSchema()
root
|-- id: string (nullable = true)
|-- matchval1: integer (nullable = true)
|-- similar: string (nullable = true)
|-- matchval3: integer (nullable = true)
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import collect_list
dfx = df.groupBy("id").agg(concat_ws(",", collect_list("similar")).alias("Similar_Items")).select(col("id"), col("Similar_Items"))
dfx.show()
+---------+-------------------+
| id| Similar_Items|
+---------+-------------------+
|product B| product X|
|product A|product X,product Y|
|product C| product Z|
+---------+-------------------+