Сбой команды pandas merge в параллельном цикле - «ValueError: исходный массив буфера доступен только для чтения» - PullRequest
0 голосов
/ 08 мая 2019

Я пишу алгоритм начальной загрузки, используя параллельные циклы и панды. Проблема, с которой я столкнулся, заключается в том, что команда слияния внутри параллельного цикла вызывает ошибку «ValueError: исходный массив буфера доступен только для чтения», но только если я использую полный набор данных для слияния (120 тыс. Строк). Любое подмножество, содержащее менее 12 тыс. Строк, будет работать нормально, поэтому я полагаю, что это не проблема синтаксиса. Что я могу сделать?

Текущая версия pandas - 0.24.2, а cython - 0.29.7.

_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 418, in _process_worker
    r = call_item()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 272, in __call__
    return self.fn(*self.args, **self.kwargs)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/joblib/_parallel_backends.py", line 567, in __call__
    return self.func(*args, **kwargs)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/joblib/parallel.py", line 225, in __call__
    for func, args, kwargs in self.items]
  File "/home/ubuntu/.local/lib/python3.6/site-packages/joblib/parallel.py", line 225, in <listcomp>
    for func, args, kwargs in self.items]
  File "<ipython-input-72-cdb83eaf594c>", line 12, in bootstrap
  File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/frame.py", line 6868, in merge
    copy=copy, indicator=indicator, validate=validate)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 48, in merge
    return op.get_result()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 546, in get_result
    join_index, left_indexer, right_indexer = self._get_join_info()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 756, in _get_join_info
    right_indexer) = self._get_join_indexers()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 735, in _get_join_indexers
    how=self.how)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 1130, in _get_join_indexers
    llab, rlab, shape = map(list, zip(* map(fkeys, left_keys, right_keys)))
  File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 1662, in _factorize_keys
    rlab = rizer.factorize(rk)
  File "pandas/_libs/hashtable.pyx", line 111, in pandas._libs.hashtable.Int64Factorizer.factorize
  File "stringsource", line 653, in View.MemoryView.memoryview_cwrapper
  File "stringsource", line 348, in View.MemoryView.memoryview.__cinit__
ValueError: buffer source array is read-only
"""

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
<ipython-input-73-652c1db5701b> in <module>()
      1 num_cores = multiprocessing.cpu_count()
----> 2 results = Parallel(n_jobs=num_cores, prefer='processes', verbose = 5)(delayed(bootstrap)() for i in range(n_trials))
      3 #pd.DataFrame(results[0])

~/.local/lib/python3.6/site-packages/joblib/parallel.py in __call__(self, iterable)
    932 
    933             with self._backend.retrieval_context():
--> 934                 self.retrieve()
    935             # Make sure that we get a last message telling us we are done
    936             elapsed_time = time.time() - self._start_time

~/.local/lib/python3.6/site-packages/joblib/parallel.py in retrieve(self)
    831             try:
    832                 if getattr(self._backend, 'supports_timeout', False):
--> 833                     self._output.extend(job.get(timeout=self.timeout))
    834                 else:
    835                     self._output.extend(job.get())

~/.local/lib/python3.6/site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
    519         AsyncResults.get from multiprocessing."""
    520         try:
--> 521             return future.result(timeout=timeout)
    522         except LokyTimeoutError:
    523             raise TimeoutError()

/usr/lib/python3.6/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433             else:
    434                 raise TimeoutError()

/usr/lib/python3.6/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

ValueError: buffer source array is read-only

и код

def bootstrap():

    df_resample_ids = skl.utils.resample(ob_ids)
    df_resample_ids = pd.DataFrame(df_resample_ids).sort_values(by="0").reset_index(drop=True)
    df_resample_ids.columns = [ob_id_field]

    df_resample = pd.DataFrame(df_resample_ids.merge(df, on = ob_id_field))

    return df_resample

num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores, prefer='processes', verbose = 5)(delayed(bootstrap)() for i in range(n_trials))

Алгоритм будет создавать идентификаторы с измененной выборкой / заменой из переменной ID и использовать команду слияния для создания нового набора данных на основе идентификаторов с измененной выборкой и исходного набора данных, сохраненного в df. Если я вырежу подмножество исходного набора данных (где угодно), оставив менее ~ 12 тыс. Строк, то параллельный цикл завершится без ошибок и сделает все как положено.

В соответствии с запросом ниже приведен новый фрагмент для повторного создания структур данных и отражения основного подхода, над которым я сейчас работаю:

import pandas as pd
import sklearn as skl
import multiprocessing
from joblib import Parallel, delayed

df = pd.DataFrame(np.random.randn(200000, 24), columns=list('ABCDDEFGHIJKLMNOPQRSTUVW'))
df["ID"] = df.index.drop_duplicates().tolist() 
ob_ids = df.index.drop_duplicates().tolist() 

def bootstrap2():

    df_resample_ids = skl.utils.resample(ob_ids)
    df_resample_ids = pd.DataFrame(df_resample_ids).sort_values(by=0).reset_index(drop=True)
    df_resample_ids.columns = ['ID']
    df_resample = pd.DataFrame(df1.merge(df_resample_ids, on = 'ID'))

    result = df_resample

    return result

num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores, prefer='processes', verbose = 5)(delayed(bootstrap2)() for i in range(n_trials))

Однако я замечаю, что когда данные полностью состоят из случайных чисел np.racle, цикл проходит без ошибки. Типы исходного кадра данных:

start_rtg                        int64
end_rtg                        float64
days_diff                      float64
ultimate_customer_system_id      int64

Как я могу избежать ошибки только для чтения?

1 Ответ

0 голосов
/ 09 мая 2019

публикуя ответ на мой вопрос, я обнаружил, что одна из переменных имеет тип данных int64.когда я преобразовал все переменные в float64, ошибка исчезла.так что это проблема, которая ограничена только определенными типами данных ...

ура Стефан

...