Concurrent.futures: потоки против процессов - PullRequest
0 голосов
/ 29 апреля 2018

Хотя я нашел несколько гипотетических и теоретических постов по этому вопросу, самое близкое, что я нашел, это здесь , и опубликованный ответ имеет дело с противоположностью того, что, как мне кажется, я ищу справка (на случай, если эта ссылка кому-нибудь поможет).

Я получил следующий код из вики на Github, здесь . Его реализация казалась довольно простой, однако я не смог использовать его в своем родном виде.

Вот мой код «Процесс», который я использую:

import dask.dataframe as dd

from concurrent.futures import ProcessPoolExecutor
import pandas as pd
import gdelt

gd = gdelt.gdelt(version=2)

e = ProcessPoolExecutor()

def getter(x):
    try:
        date = x.strftime('%Y%m%d')
        d = gd.Search(date, coverage=True)
        d.to_csv("{}_gdeltdata.csv".format(date),encoding='utf-8',index=False)
    except:
        pass

results = list(e.map(getter,pd.date_range('2015 Apr 21','2018 Apr 21')))

Вот полная ошибка:

BrokenProcessPool                         Traceback (most recent call last)
<ipython-input-1-874f937ce512> in <module>()
     21 
     22 # now pull the data; this will take a long time
---> 23 results = list(e.map(getter,pd.date_range('2015 Apr 21','2018 Apr 21')))
     24 
     25 

C:\Anaconda3\lib\concurrent\futures\process.py in_chain_from_iterable_of_lists(iterable)
    364     careful not to keep references to yielded objects.
    365     """
--> 366     for element in iterable:
    367         element.reverse()
    368         while element:

C:\Anaconda3\lib\concurrent\futures\_base.py in result_iterator()
    584                     # Careful not to keep a reference to the popped future
    585                     if timeout is None:
--> 586                         yield fs.pop().result()
    587                     else:
    588                         yield fs.pop().result(end_time - time.time())

C:\Anaconda3\lib\concurrent\futures\_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433             else:
    434                 raise TimeoutError()

C:\Anaconda3\lib\concurrent\futures\_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

*BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.*

Есть какие-нибудь идеи относительно того, как решить эту ошибку? Я знаю, что если я заменю ProcessPoolExecutor на ThreadPoolExecutor, проблема, похоже, будет решена (хотя я не прошел весь набор данных до конца (поэтому я не могу быть полностью уверен), однако, я полагаю, что у меня будет более быстрый результат, если я использую ProcessPoolExecutor.

В конечном итоге я буду использовать dask для работы с данными в Pandas. Заранее спасибо.

1 Ответ

0 голосов
/ 12 июля 2018

Примеры в документации всегда показывают исполнение в и if __name__ == '__main__' предложении. Надеюсь, этот mcve точно имитирует ваш сценарий использования

def gd(s):
    return s*3

def getter(w):
    return gd(w)

data = list('abcdefg')

def main():
    with ProcessPoolExecutor(max_workers=4) as executor:
        for thing in executor.map(getter, data):
            print(thing)

Выполнено, как это работает,

#main()
if __name__ == '__main__':
    main()

Но выполнение так не происходит - выдает BrokenProcessPool ошибку

main()
if __name__ == '__main__':
    #main()

Попробуйте убедиться, что строка results = list(e.map(getter,pd.date_range(...))) работает в *__main__* процессе

...