Я понимаю, что этот вопрос, возможно, задавался раньше, но я не нашел решения, которое работает специально для строк и является относительно простым.
У меня есть фрейм данных, в котором есть столбец с почтовым индексом который использует удаленный API для получения сведений об этом почтовом индексе. Я пытаюсь распараллелить выборку данных, чтобы выполнить ее в нескольких потоках.
Простой пример:
def get_cities_by_zip_code(zip):
resp = requests.post(geo_svc_url, json={'query': """query GetZipCodeInformation($zip: Float!) {
zipCode(zip: $zip) {
....
}
}""", 'variables': {'zip': zip}})
return resp.json()['data']['zipCode']
def location_options(df):
resp = get_cities_by_zip_code(df['Zip code'])
if resp is not None:
df['City'] = resp['preferredName']
df['Population'] = (next(x for x in resp['places'] if x['type'] == 'city') or { 'population': 'n/a' })['population']
return df
def make_df():
// A function that generates initial dataframe
df = make_df()
Затем я должен применить location_options
к df
параллельно. Я попробовал пару решений для достижения этой цели. Например:
- Через
multiprocessing
num_partitions = 20 #number of partitions to split dataframe
num_cores = 8 #number of cores on your machine
def parallelize_dataframe(df, func):
df_split = np.array_split(df, num_partitions)
pool = Pool(num_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
df = parallelize_dataframe(df, location_options)
Не работает (не полная трассировка стека).
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
TypeError: Object of type Series is not JSON serializable
swifter
- по некоторым причинам не работает со строками, работает только с одним потоком.
Простой ли
df = df.apply(location_options, axis=1)
работает просто отлично. Но он однопоточный.