Функция, которую вы ищете: join
. Вот простой пример, основанный на ваших данных:
import pyspark as sp
from pyspark.sql import SparkSession
samples = [{'store_product_id':1,'time_create':2,'last_timestamp':3},{'store_product_id':2,'time_create':2,'last_timestamp':2},{'store_product_id':3,'time_create':4,'last_timestamp':4},{'store_product_id':4,'time_create':2,'last_timestamp':5}]
spark = SparkSession \
.builder \
.appName('test') \
.getOrCreate()
df1 = spark.createDataFrame(samples)
df1.show()
Это дает:
+--------------+----------------+-----------+
|last_timestamp|store_product_id|time_create|
+--------------+----------------+-----------+
| 3| 1| 2|
| 2| 2| 2|
| 4| 3| 4|
| 5| 4| 2|
+--------------+----------------+-----------+
Давайте отфильтруем по времени и создадим еще один кадр данных из этого:
df2 = df1.filter(df1.time_create==df1.last_timestamp)
ids = df2.select('store_product_id').show()
+----------------+
|store_product_id|
+----------------+
| 2|
| 3|
+----------------+
И здесь мы объединяем оба набора данных в store_product_id :
df3 = df1.join(df2,'store_product_id','inner').show()
+----------------+--------------+-----------+--------------+-----------+
|store_product_id|last_timestamp|time_create|last_timestamp|time_create|
+----------------+--------------+-----------+--------------+-----------+
| 3| 4| 4| 4| 4|
| 2| 2| 2| 2| 2|
+----------------+--------------+-----------+--------------+-----------+
Внутреннее соединение дает пересечение df1 и df2 на основе store_product_id