Как мне распараллелить .apply в pandas на строку? - PullRequest
0 голосов
/ 16 апреля 2020

Я понимаю, что этот вопрос, возможно, задавался раньше, но я не нашел решения, которое работает специально для строк и является относительно простым.

У меня есть фрейм данных, в котором есть столбец с почтовым индексом который использует удаленный API для получения сведений об этом почтовом индексе. Я пытаюсь распараллелить выборку данных, чтобы выполнить ее в нескольких потоках.

Простой пример:

def get_cities_by_zip_code(zip):
    resp = requests.post(geo_svc_url, json={'query': """query GetZipCodeInformation($zip: Float!) {
  zipCode(zip: $zip) {
    ....
  }
}""", 'variables': {'zip': zip}})

    return resp.json()['data']['zipCode']


def location_options(df):
  resp = get_cities_by_zip_code(df['Zip code'])

  if resp is not None:
    df['City'] = resp['preferredName']
    df['Population'] = (next(x for x in resp['places'] if x['type'] == 'city') or { 'population': 'n/a' })['population']

  return df

def make_df():
  // A function that generates initial dataframe


df = make_df()

Затем я должен применить location_options к df параллельно. Я попробовал пару решений для достижения этой цели. Например:

  1. Через multiprocessing
num_partitions = 20 #number of partitions to split dataframe
num_cores = 8 #number of cores on your machine

def parallelize_dataframe(df, func):
    df_split = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df


df = parallelize_dataframe(df, location_options)

Не работает (не полная трассировка стека).

multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
TypeError: Object of type Series is not JSON serializable
swifter - по некоторым причинам не работает со строками, работает только с одним потоком.

Простой ли

df = df.apply(location_options, axis=1)

работает просто отлично. Но он однопоточный.

1 Ответ

0 голосов
/ 16 апреля 2020

Возможно, я нашел решение из соответствующего поста.

Этот работал для меня, а другие - нет. Мне тоже пришлось это: https://github.com/darkskyapp/forecast-ruby/issues/13

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...