Преобразовать фрейм Spark Data в несколько списков с одним столбцом в качестве ключа - PullRequest
1 голос
/ 29 апреля 2019

Рассмотрим кадр данных Spark df вот так

+----+-------+----+----+
|bin|median|min|end|
+----+-------+----+----+
|   1|    0.0|   0|   0.5|
|   2|    1.0|   0.8|   1.7|
|   3|    2.0|   1.6|   2.5|
|   4|    4.0|   3.7|   4.7|
|   5|    6.0|   5.7|   6.3|

Я хотел бы вытащить каждый атрибут / столбец как отдельный словарь / список с ключом bin , что означает

median[1] = 0.0 #df[df.bin == 1]
median[key= 1,2,3,4,5] = [0.0,1.0,2.0,4.0,6.0]
min[key= 1,2,3,4,5] = [0,0.8,1.6,3.7,5.7]

Я имею в виду что-то вроде отображения в rdd, а как насчет чего-то более манипулирующего "кадрами данных"? Есть ли способ вытащить все списки одновременно?

median = {}
df.rdd.map(lambda row : median[row.bin] = row.median)

Каков ответ, если я хочу вытащить список вместо словаря, предполагая, что корзина будет пронумерована непрерывно от 1? Как мы можем обеспечить сохранность заказа? .orderBy().collect()?

Ответы [ 2 ]

1 голос
/ 01 мая 2019

Вот еще один подход, который обеспечивает поддержку фильтрации ключей и столбцов.Решение состоит из двух функций:

  • as_dict(df, cols, ids, key): возвращает данные в словарь
  • extract_col_from_dict(dct, col, ids): извлекает данные столбца из словаря

Изначально давайте извлечем нужные данные в словарь из указанного кадра данных:

def as_dict(df, cols = [], ids = [], key = 0):
  key_idx = 0

  if isinstance(key, int):
    key_idx = key
    key = df.columns[key_idx]
  elif isinstance(key, str):
    key_idx = df.columns.index(key)
  else:
    raise Exception("Please provide a valid key e.g:{1, 'col1'}")

  df = df.select("*") if not cols else df.select(*[[key] + cols])

  if ids:
    df = df.where(df[key].isin(ids))

  return df.rdd.map(lambda x : (x[key_idx], x.asDict())).collectAsMap()

Аргументы:

  • df :фрейм данных
  • столбцы : столбцы, с которыми вы хотите работать, по умолчанию включают все столбцы
  • идентификаторы : по порядкучтобы избежать сбора всего набора данных в драйвере, вы можете фильтровать на основе этого.Это относится к столбцу key . По умолчанию включить все записи
  • ключ : ключевой столбец, это может быть строка / int, по умолчанию 0

Давайте вызовем функцию с вашим набором данных:

df = spark.createDataFrame(
[(1, 0.0, 0., 0.5),
(2, 1.0, 0.8, 1.7),
(3, 2.0, 1.6, 2.5),
(4, 4.0, 3.7, 4.7),
(5, 6.0, 5.7, 6.3)], ["bin", "median", "min", "end"])

dict_ = as_dict(df)
dict_
{1: {'bin': 1, 'min': 0.0, 'end': 0.5, 'median': 0.0},
 2: {'bin': 2, 'min': 0.8, 'end': 1.7, 'median': 1.0},
 3: {'bin': 3, 'min': 1.6, 'end': 2.5, 'median': 2.0},
 4: {'bin': 4, 'min': 3.7, 'end': 4.7, 'median': 4.0},
 5: {'bin': 5, 'min': 5.7, 'end': 6.3, 'median': 6.0}}

# or with filters applied
dict_ = as_dict(df, cols = ['min', 'end'], ids = [1, 2, 3])
dict_
{1: {'bin': 1, 'min': 0.0, 'end': 0.5},
 2: {'bin': 2, 'min': 0.8, 'end': 1.7},
 3: {'bin': 3, 'min': 1.6, 'end': 2.5}}

Функция отобразит записи в пары ключ / значение , где значением будет также словарь (вызывая row.asDict).

После вызова функции as_dict данные будут расположены в драйвере, и теперь вы можете извлечь нужные данные с помощью extract_col_from_dict:

def extract_col_from_dict(dct, col, ids = []):
  filtered = {}
  if ids:
    filtered = { key:val for key, val in dct.items() if key in ids }
  else:
    filtered = { key:val for key, val in dct.items() }

  return [d[col] for d in list(filtered.values())]

Аргументы:

  • dct : исходный словарь
  • col : извлекаемый столбец
  • идентификаторы : дополнительная фильтрация, по умолчанию все записи

И вывод функции:

min_data = extract_col_from_dict(dict_, 'min')
min_data
[0.0, 0.8, 1.6, 3.7, 5.7]
1 голос
/ 29 апреля 2019

Если вы все равно пытаетесь collect данных, самый простой способ IMO получить данные в нужном вам формате - через панд.

Вы можете вызвать toPandas(), установить индекс на bin, а затем вызвать to_dict():

output = df.toPandas().set_index("bin").to_dict()
print(output)
#{'end': {1: 0.5, 2: 1.7, 3: 2.5, 4: 4.7, 5: 6.3},
# 'median': {1: 0.0, 2: 1.0, 3: 2.0, 4: 4.0, 5: 6.0},
# 'min': {1: 0.0, 2: 0.8, 3: 1.6, 4: 3.7, 5: 5.7}}

Это создаст словарь словарей, где внешний ключимя столбца и внутренний ключ - это корзина.Если вам нужны отдельные переменные, вы можете просто извлечь из output, но не используйте min в качестве имени переменной, так как оно будет топать __builtin__.min.

median, min_, end = output['median'], output['min'], output['end']
print(median[1])
#0.0
...