Вот настройка для примера (я зарегистрировал таблицы, поскольку кажется, что вы уже работаете с таблицами в первую очередь)
>>> events = sc.parallelize([(2468, 0), (1234, 10)]).toDF(["id", "time"])
>>> data = sc.parallelize([("a", -1),("b", 2),("c", 5),("d", 8),("e", 10),("f", 11),("g", 12)]).toDF(["id", "time"])
>>> events.registerTempTable("events")
>>> data.registerTempTable("data")
, поэтому мы хотим объединить эти две таблицы только тогда, когда time
столбцы содержат delta
друг от друга, это будет abs(data.time - events.time) <= 5
, вы можете отформатировать это для разных дельт
>>> events_range = spark.sql("SELECT events.id as event, data.id, data.time FROM events JOIN data ON abs(data.time - events.time) <= 5")
>>> events_range.show()
+-----+---+----+
|event| id|time|
+-----+---+----+
| 2468| a| -1|
| 2468| b| 2|
| 2468| c| 5|
| 1234| c| 5|
| 1234| d| 8|
| 1234| e| 10|
| 1234| f| 11|
| 1234| g| 12|
+-----+---+----+
, и теперь мы хотим получить каждое уникальное событие результата как собственный фрейм данных
>>> eventNames = events.select("id").collect() #get all events
>>> eventNames
[Row(id=2468), Row(id=1234)]
>>> for evt in eventNames:
... evt_df = events_range.where(f"event = {evt['id']}").drop("event")
... evt_df.show()
...
+---+----+
| id|time|
+---+----+
| a| -1|
| b| 2|
| c| 5|
+---+----+
+---+----+
| id|time|
+---+----+
| c| 5|
| d| 8|
| e| 10|
| f| 11|
| g| 12|
+---+----+
и там вы сохраните их вместо печати ... Также, если некоторые события не соответствуют критериям, вы можете получить пустые кадры данных здесь.