У меня есть следующий DataFrame df
в PySpark.
import pyspark.sql.functions as func
df = spark\
.read \
.format("org.elasticsearch.spark.sql") \
.load("my_index/my_mapping") \
.groupBy(["id", "type"]) \
.agg(
func.count(func.lit(1)).alias("number_occurrences"),
func.countDistinct("host_id").alias("number_hosts")
)
ds = df.collect()
Я использую collect
, потому что объем данных после группировки и агрегирования всегда мал и умещается в памяти.Также мне нужно использовать collect
, потому что я передаю ds
в качестве параметра функции udf
.Функция collect
возвращает массив.Как я могу сделать следующие запросы к этому массиву: для заданных id
и type
, вернуть number_occurrences
и number_hosts
.
Например, давайте представим, что df
содержит следующие строки:
id type number_occurrences number_hosts
1 xxx 11 3
2 yyy 10 4
После выполнения df.collect()
, как я могу получить number_occurences
и number_hosts
для id
, равного 1
и type
, равного xxx
.Ожидаемый результат:
number_occurrences = 11
number_hosts = 3
Обновление:
Может быть, есть более элегантное решение?
id = 1
type = "xxx"
number_occurrences = 0
number_hosts = 0
for row in ds:
if (row["id"] == id) & (row["type"] == type):
number_occurrences = row["number_occurrences"]
number_hosts = row["number_hosts"]