Вот еще один подход, который обеспечивает поддержку фильтрации ключей и столбцов.Решение состоит из двух функций:
as_dict(df, cols, ids, key)
: возвращает данные в словарь extract_col_from_dict(dct, col, ids)
: извлекает данные столбца из словаря
Изначально давайте извлечем нужные данные в словарь из указанного кадра данных:
def as_dict(df, cols = [], ids = [], key = 0):
key_idx = 0
if isinstance(key, int):
key_idx = key
key = df.columns[key_idx]
elif isinstance(key, str):
key_idx = df.columns.index(key)
else:
raise Exception("Please provide a valid key e.g:{1, 'col1'}")
df = df.select("*") if not cols else df.select(*[[key] + cols])
if ids:
df = df.where(df[key].isin(ids))
return df.rdd.map(lambda x : (x[key_idx], x.asDict())).collectAsMap()
Аргументы:
- df :фрейм данных
- столбцы : столбцы, с которыми вы хотите работать, по умолчанию включают все столбцы
- идентификаторы : по порядкучтобы избежать сбора всего набора данных в драйвере, вы можете фильтровать на основе этого.Это относится к столбцу key . По умолчанию включить все записи
- ключ : ключевой столбец, это может быть строка / int, по умолчанию 0
Давайте вызовем функцию с вашим набором данных:
df = spark.createDataFrame(
[(1, 0.0, 0., 0.5),
(2, 1.0, 0.8, 1.7),
(3, 2.0, 1.6, 2.5),
(4, 4.0, 3.7, 4.7),
(5, 6.0, 5.7, 6.3)], ["bin", "median", "min", "end"])
dict_ = as_dict(df)
dict_
{1: {'bin': 1, 'min': 0.0, 'end': 0.5, 'median': 0.0},
2: {'bin': 2, 'min': 0.8, 'end': 1.7, 'median': 1.0},
3: {'bin': 3, 'min': 1.6, 'end': 2.5, 'median': 2.0},
4: {'bin': 4, 'min': 3.7, 'end': 4.7, 'median': 4.0},
5: {'bin': 5, 'min': 5.7, 'end': 6.3, 'median': 6.0}}
# or with filters applied
dict_ = as_dict(df, cols = ['min', 'end'], ids = [1, 2, 3])
dict_
{1: {'bin': 1, 'min': 0.0, 'end': 0.5},
2: {'bin': 2, 'min': 0.8, 'end': 1.7},
3: {'bin': 3, 'min': 1.6, 'end': 2.5}}
Функция отобразит записи в пары ключ / значение , где значением будет также словарь (вызывая row.asDict).
После вызова функции as_dict данные будут расположены в драйвере, и теперь вы можете извлечь нужные данные с помощью extract_col_from_dict:
def extract_col_from_dict(dct, col, ids = []):
filtered = {}
if ids:
filtered = { key:val for key, val in dct.items() if key in ids }
else:
filtered = { key:val for key, val in dct.items() }
return [d[col] for d in list(filtered.values())]
Аргументы:
- dct : исходный словарь
- col : извлекаемый столбец
- идентификаторы : дополнительная фильтрация, по умолчанию все записи
И вывод функции:
min_data = extract_col_from_dict(dict_, 'min')
min_data
[0.0, 0.8, 1.6, 3.7, 5.7]