То, чего я пытаюсь достичь, довольно просто: я хочу проверить все идентификаторы (uuid), если они испытывают определенный «статус» (поведенческий статус). Если они это сделают, то верните мне все записи, связанные с этим идентификатором. Например, если один из идентификаторов ниже имеет статус «три», я хочу сохранить все эти записи, связанные с этим идентификатором. Пока что я могу достичь этого двумя способами:
// first method
val dfList = df.filter($"status" === "three").select($"id").distinct.map(_.getString(0)).collect.toList
val dfTransformedOne = df.filter($"id".isin(dfList:_*))
// second method
val dfIds = df.filter($"status" === "three").select($"id").distinct
val dfTransformedTwo = df.join(broadcast(dfIds), Seq("id"))
Приведенные выше два метода отлично работают с образцами данных, с которыми я работаю, однако у меня возникают некоторые проблемы с производительностью, когда я начинаю увеличивать объем обрабатываемых данных, поскольку у меня могут быть миллионы до сотен миллионов идентификаторов, которые Мне нужно отфильтровать для. Есть ли более эффективный способ сделать вышеупомянутое, или это просто случай увеличения аппаратного обеспечения, которое я использую?
Ниже приведен пример данных и ожидаемый результат.
val df = Seq(
("1234", "one"),
("1234", "two"),
("1234", "three"),
("234", "one"),
("234", "one"),
("234", "two")
).toDF("id", "status")
df.show
+----+------+
| id|status|
+----+------+
|1234| one|
|1234| two|
|1234| three|
| 234| one|
| 234| one|
| 234| two|
+----+------+
dfTransformed.show()
+----+------+
| id|status|
+----+------+
|1234| one|
|1234| two|
|1234| three|
+----+------+