Dask to Flatten словарь - PullRequest
2 голосов
/ 13 мая 2019

Я новичок в Dask и ищу способ выравнивания столбца словаря в кадре данных PANDAS. Вот снимок экрана с первой строкой 16-миллионного ряда данных:

screenshot of first two rows of data

А вот пример текста из трех строк:

{{u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'DEBRA MEALY', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'CHAIR PERSON', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'HELEN GORDON', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'VICE CHAIR', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {'F9_07_PC_HIGH_COMP_EMPLOYEE': 'X', 'F9_07_PZ_DIRTRSTKEY_NAME': 'ROB S KHANUJA', 'F9_07_PZ_COMP_OTHER': '14902', 'F9_07_PZ_COMP_RELATED': '0', 'F9_07_PZ_TITLE': 'EXEC. DIR. OPERATIONS', 'F9_07_PZ_AVE_HOURS_WEEK': '40.00', 'F9_07_PZ_COMP_DIRECT': '133173'}}

Обычно я выравниваю столбец Form990PartVIISectionAGrp со следующим кодом:

    df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].swifter.apply(pd.Series)], axis=1)

Я пытаюсь сделать это в Dask, но получаю следующую ошибку: «ValueError: Столбцы в вычисленных данных не соответствуют столбцам в предоставленных метаданных.»

Я использую Python 2.7. Я импортирую соответствующие пакеты

    from dask import dataframe as dd
    from dask.multiprocessing import get
    from multiprocessing import cpu_count
    nCores = cpu_count()

Для проверки кода я создал случайную выборку данных:

    dfs = df.sample(1000)

А затем сгенерируйте кадр данных Dask:

    ddf = dd.from_pandas(dfs, npartitions=nCores)

В данный момент столбец имеет строковый формат, поэтому я преобразую его в словарь. Обычно я бы написал одну строчку кода:

dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval) 

Но вместо этого я попытался сделать это здесь в более «похожей на Даск» форме, поэтому я пишу следующую функцию и затем применяю ее:

    def make_dict(dfs):
        dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)   
        return dfs
    ddf_out = ddf.map_partitions(make_dict, meta=dfs[:0]).compute()

Это работает - он возвращает фрейм данных PANDAS, в котором столбец Form990PartVIISectionAGrp представлен в формате словаря (однако, он работает не быстрее, чем не-Dask).

ddf_out

Затем я воссоздаю Dask DF:

    ddf = dd.from_pandas(ddf_out, npartitions=nCores)

И напишите функцию для выравнивания столбца:

    def flatten(ddf_out):
        ddf_out = pd.concat([ddf_out.drop(['Form990PartVIISectionAGrp'], axis=1), ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
        #ddf_out = ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)
    return ddf_out

Если я запусту этот код:

    result = ddf.map_partitions(flatten)

Я получаю следующий вывод, где столбец не был сплющен:

result

Я также получал ошибки об отсутствующих метаданных, и, учитывая, что вышеупомянутое не помогло разобрать столбец словаря, поэтому я создал список столбцов, полученных простым выравниванием столбцов Python, и использовал его для создания словаря столбцов и типов данных:

metadir = {u'BusinessName': 'O', u'F9_07_PC_FORMER': 'O', u'F9_07_PC_HIGH_COMP_EMPLOYEE': 'O',
       u'F9_07_PC_KEY_EMPLOYEE': 'O', u'F9_07_PC_OFFICER': 'O',
       u'F9_07_PC_TRUSTEE_INDIVIDUAL': 'O', u'F9_07_PC_TRUSTEE_INSTITUTIONAL': 'O',
       u'F9_07_PZ_AVE_HOURS_WEEK': 'O', u'F9_07_PZ_AVE_HOURS_WEEK_RELATED': 'O',
       u'F9_07_PZ_COMP_DIRECT': 'O', u'F9_07_PZ_COMP_OTHER': 'O',
       u'F9_07_PZ_COMP_RELATED': 'O', u'F9_07_PZ_DIRTRSTKEY_NAME': 'O',
       u'F9_07_PZ_TITLE': 'O', u'NameBusiness': 'O', u'URL': 'O'}

Затем я применяю функцию выравнивания с этими метаданными:

    result = ddf.map_partitions(flatten, meta=metadir)

В результате я получаю следующий вывод:

result

Запуск result.columns дает следующее:

result.columns

Где это не удается, - это выполнение compute (), где я получаю следующее сообщение об ошибке: «ValueError: Столбцы в вычисленных данных не совпадают со столбцами в предоставленных метаданных». Я получаю ту же ошибку, пишу ли я:

result.compute()

или

result.compute(meta=metadir)

Я не уверен, что я здесь делаю неправильно. Столбцы в result , похоже, совпадают с столбцами в metadir . Любые предложения будут с благодарностью.

UPDATE: Вот мой пример обновления функции сглаживания.

    meta = pd.DataFrame(columns=['URL', 'F9_07_PC_TRUSTEE_INDIVIDUAL',
     'F9_07_PZ_DIRTRSTKEY_NAME',
     'F9_07_PZ_COMP_OTHER',
     'F9_07_PZ_COMP_RELATED',
     'F9_07_PZ_TITLE',
     'F9_07_PZ_AVE_HOURS_WEEK',
     'F9_07_PZ_COMP_DIRECT',
     'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
     'F9_07_PC_OFFICER',
     'F9_07_PC_HIGH_COMP_EMPLOYEE',
     'BusinessName',
     'F9_07_PC_KEY_EMPLOYEE',
     'F9_07_PC_TRUSTEE_INSTITUTIONAL',
     'NameBusiness',
     'F9_07_PC_FORMER'], dtype="O")

    def flatten(ddf_out):
        ddf_out = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
        for m in meta:
            if m not in ddf_out:
                df[m] = '' 
        return ddf_out

Тогда я бегу:

result = ddf.map_partitions(flatten, meta=meta).compute()

Ответы [ 2 ]

2 голосов
/ 17 мая 2019

Несколько замечаний для начала:

.Не (literal_eval)

Разве это не было бы лучше, чем map?

Затем я воссоздаю Dask DF:

ddf = dd.from_pandas (ddf_out, npartitions = nCores)

ddf_out уже был фреймом данных dask, я не знаю, почему вы должны были это сделать.

Кажется, что столбцы в результате совпадают со столбцами в metadir.

Значение result.columns взято из предоставленной вами мета, никаких вычислений не происходит, пока вы не попросите об этом (dask ленив в большинстве операций). Разве исключение ValueError не предоставляет дополнительную информацию?

Вот полный пример

x = ({'F9_07_PZ_COMP_DIRECT': '0',
  'F9_07_PZ_DIRTRSTKEY_NAME': 'DEBRA MEALY',
  'F9_07_PZ_COMP_OTHER': '0',
  'F9_07_PZ_COMP_RELATED': '0',
  'F9_07_PZ_TITLE': 'CHAIR PERSON',
  'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
  'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X'},
 {'F9_07_PZ_COMP_DIRECT': '0',
  'F9_07_PZ_DIRTRSTKEY_NAME': 'HELEN GORDON',
  'F9_07_PZ_COMP_OTHER': '0',
  'F9_07_PZ_COMP_RELATED': '0',
  'F9_07_PZ_TITLE': 'VICE CHAIR',
  'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
  'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X'})
df = pd.DataFrame({'a': x})
d = dd.from_pandas(df, 1)
meta = pd.DataFrame(columns=['F9_07_PZ_COMP_DIRECT', 
       'F9_07_PZ_DIRTRSTKEY_NAME',
       'F9_07_PZ_COMP_OTHER', 'F9_07_PZ_COMP_RELATED', 'F9_07_PZ_TITLE',
       'F9_07_PZ_AVE_HOURS_WEEK', 'F9_07_PC_TRUSTEE_INDIVIDUAL'], dtype="O")
d.map_partitions(lambda df: df.a.apply(pd.Series), meta=meta).compute()

Как я узнал, что meta использовать? Я применил функцию к фрейму данных pandas - для этого вы можете использовать небольшой фрагмент фрейма данных.

Некоторые дополнительные примечания:

  • Это анти-паттерн для загрузки данных с пандами, передачи работникам, работающим с дасками, а затем для сбора всего результата обратно на фрейм данных панд (в памяти), вы вряд ли увидите ускорение таким образом и можете повлечь за собой много накладных расходов. Вам лучше загружать что-то вроде dd.read_csv, а также агрегировать или писать с помощью функций dask. Только compute() для чего-то, что будет маленьким или не вернет ничего (потому что это включает запись вывода). Официальные примеры не используют from_pandas.
  • Обработка строк и dict - это методы python, поэтому они содержат блокировку интерпретатора (GIL) любой функции python: потоки фактически не будут работать параллельно. Чтобы получить параллелизм, вам нужно запускать процессы, чего проще всего добиться с помощью https://docs.dask.org/en/latest/setup/single-distributed.html
  • распределенный планировщик также предоставляет вам доступ к панели мониторинга, которая содержит много полезной информации для диагностики работы вашей системы. Вы также можете настроить его поведение на случай, если у вас есть правила брандмауэра, которым нужно следовать.
0 голосов
/ 28 мая 2019

Учитывая небольшой или средний набор данных, простое решение PANDAS будет работать:

df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)

Однако с 16 миллионами строк решение PANDAS не будет работать ни на Macbook с 16 ГБ ОЗУ, ни на компьютере Windows с 96 ГБ. По этой причине я посмотрел на Даск. Однако, как видно из приведенных выше ответов и комментариев, решение Dask не работает, поскольку каждое наблюдение в моем наборе данных не обязательно имеет все ключи словаря. В совокупности 16 миллионов наблюдений Form990PartVIISectionAGrp имеют 15 ключей в следующем списке:

  newkeys = ['F9_07_PC_TRUSTEE_INDIVIDUAL',
 'F9_07_PZ_DIRTRSTKEY_NAME',
 'F9_07_PZ_COMP_OTHER',
 'F9_07_PZ_COMP_RELATED',
 'F9_07_PZ_TITLE',
 'F9_07_PZ_AVE_HOURS_WEEK',
 'F9_07_PZ_COMP_DIRECT',
 'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
 'F9_07_PC_OFFICER',
 'F9_07_PC_HIGH_COMP_EMPLOYEE',
 'BusinessName',
 'F9_07_PC_KEY_EMPLOYEE',
 'F9_07_PC_TRUSTEE_INSTITUTIONAL',
 'NameBusiness',
 'F9_07_PC_FORMER']

Итак, мое решение заключалось в том, чтобы взять некоторые подсказки, предоставленные выше @mdurant, и сначала добавить недостающие ключи в каждую строку:

for index, row in df[:].iterrows():
    for k in newkeys:
        row['Form990PartVIISectionAGrp'].setdefault(k, np.nan)

Это заняло 100 минут на моем Macbook. На основании комментария mdurant я затем сохранил фрейм данных в формате JSON:

df.to_json('df.json', orient='records', lines=True)

И прочитать файл в Dask как текст:

import json
import dask.bag as db
b = db.read_text('df.json').map(json.loads)

Затем создайте функцию для выравнивания столбца:

def flatten(record):
    return {
    'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],
    'F9_07_PZ_COMP_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_RELATED'],
    'F9_07_PC_TRUSTEE_INDIVIDUAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INDIVIDUAL'],
    'F9_07_PZ_DIRTRSTKEY_NAME': record['Form990PartVIISectionAGrp']['F9_07_PZ_DIRTRSTKEY_NAME'],
    'F9_07_PZ_COMP_DIRECT': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_DIRECT'],
    'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],  
    'BusinessName': record['Form990PartVIISectionAGrp']['BusinessName'],  
    'F9_07_PC_FORMER': record['Form990PartVIISectionAGrp']['F9_07_PC_FORMER'],
    'F9_07_PC_HIGH_COMP_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_HIGH_COMP_EMPLOYEE'],
    'F9_07_PC_KEY_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_KEY_EMPLOYEE'],
    'F9_07_PC_OFFICER': record['Form990PartVIISectionAGrp']['F9_07_PC_OFFICER'],
    'F9_07_PC_TRUSTEE_INSTITUTIONAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INSTITUTIONAL'],
    'F9_07_PZ_AVE_HOURS_WEEK': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK'],
    'F9_07_PZ_AVE_HOURS_WEEK_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK_RELATED'],
    'F9_07_PZ_TITLE': record['Form990PartVIISectionAGrp']['F9_07_PZ_TITLE'],
    'NameBusiness': record['Form990PartVIISectionAGrp']['NameBusiness'],
    'URL': record['URL'],
}

Затем я могу применить функцию:

df = b.map(flatten).to_dataframe()

И экспортировать данные в CSV:

df.to_csv('compensation*.csv')

Это работает как шарм! Короче говоря, основываясь на полезных комментариях mdurant выше, ключи: 1) добавить недостающие ключи ко всем наблюдениям и 2) не читать ваши данные в Dask из PANDAS (вместо этого используйте текст или CSV). Уход за этими двумя проблемами привел к хорошему решению этой проблемы.

...