Как получить конкретные значения из результата df.collect () в PySpark? - PullRequest
0 голосов
/ 10 сентября 2018

У меня есть следующий DataFrame df в PySpark.

import pyspark.sql.functions as func

df = spark\
        .read \
        .format("org.elasticsearch.spark.sql") \
        .load("my_index/my_mapping") \
        .groupBy(["id", "type"]) \
        .agg(
            func.count(func.lit(1)).alias("number_occurrences"),
            func.countDistinct("host_id").alias("number_hosts")
        )

ds = df.collect()

Я использую collect, потому что объем данных после группировки и агрегирования всегда мал и умещается в памяти.Также мне нужно использовать collect, потому что я передаю ds в качестве параметра функции udf.Функция collect возвращает массив.Как я могу сделать следующие запросы к этому массиву: для заданных id и type, вернуть number_occurrences и number_hosts.

Например, давайте представим, что df содержит следующие строки:

id   type   number_occurrences   number_hosts
1    xxx    11                   3
2    yyy    10                   4 

После выполнения df.collect(), как я могу получить number_occurences и number_hosts для id, равного 1 и type, равного xxx.Ожидаемый результат:

number_occurrences = 11
number_hosts = 3

Обновление:

Может быть, есть более элегантное решение?

    id = 1
    type = "xxx"
    number_occurrences = 0
    number_hosts = 0
    for row in ds:
        if (row["id"] == id) & (row["type"] == type):
            number_occurrences = row["number_occurrences"]
            number_hosts = row["number_hosts"]

1 Ответ

0 голосов
/ 11 сентября 2018

Если ваш id уникален, что должно иметь место для идентификатора, вы можете отсортировать массив на основе идентификатора. Это просто гарантирует правильный порядок, и если ваш идентификатор является последовательным, вы можете напрямую получить доступ к записи и вычесть идентификатор на 1

test_df = spark.createDataFrame([
(1,"xxx",11,3),(2,"yyyy",10,4),

], ("id","type","number_occurrences","number_hosts"))
id = 1
type = "xxx"
sorted_list = sorted(test_df.collect(), cmp=lambda x,y: cmp(x["id"],y["id"]))
sorted_list[id-1]["number_occurrences"],sorted_list[id-1]["number_hosts"]

Результат:

(11, 3)
...