Ваша проблема в том, что вы сравниваете данные с DataFrame column
(max_timestamp.max) из другого DataFrame. Вам нужно либо collect
результат как String
, либо crossJoin
как новый столбец для сравнения.
Воспроизводимый пример
data1 = [("1", "2020-01-01 00:00:00"), ("2", "2020-02-01 23:59:59")]
data2 = [("1", "2020-01-15 00:00:00"), ("2", "2020-01-16 23:59:59")]
df1 = spark.createDataFrame(data1, ["id", "timestamp"])
df2 = spark.createDataFrame(data2, ["id", "timestamp"])
собрать как String
from pyspark.sql.functions import col, max
max_timestamp = df2.select(max(col("timestamp")).alias("max")).distinct().collect()[0][0]
max_timestamp
# '2020-01-16 23:59:59'
df1.where(col("timestamp") > max_timestamp).show(10, truncate=False)
# +---+-------------------+
# |id |timestamp |
# +---+-------------------+
# |2 |2020-02-01 23:59:59|
# +---+-------------------+
crossJoin как новый столбец
from pyspark.sql.functions import col, max
intermediate = (
df2.
agg(max(col("timestamp")).alias("start_date_filter"))
)
intermediate.show(1, truncate=False)
# +-------------------+
# |start_date_filter |
# +-------------------+
# |2020-01-16 23:59:59|
# +-------------------+
(
df1.
crossJoin(intermediate).
where(col("timestamp") > col("start_date_filter")).
show(10, truncate=False)
)
# +---+-------------------+-------------------+
# |id |timestamp |start_date_filter |
# +---+-------------------+-------------------+
# |2 |2020-02-01 23:59:59|2020-01-16 23:59:59|
# +---+-------------------+-------------------+