Python или параллельный генератор DASK? - PullRequest
0 голосов
/ 16 ноября 2018

Возможно ли в python (возможно, с помощью dask, может быть, с помощью многопроцессорной обработки) "разместить" генераторы на ядрах, а затем, параллельно, пройтись по генераторам и обработать результаты?

Это должны быть генераторы в частности (или объекты с __iter__); списки всех найденных элементов, которые генерируют генераторы, не помещаются в память.

В частности:

С пандами я могу вызвать read_csv(...iterator=True), что дает мне итератор (TextFileReader) - я могу for in это или явно вызвать следующий несколько раз. Весь CSV никогда не будет прочитан в память. Ницца.

Каждый раз, когда я читаю следующий фрагмент из итератора, я также выполняю некоторые дорогостоящие вычисления для него.

Но теперь у меня есть 2 таких файла. Я хотел бы создать 2 таких генератора и «разместить» 1 на одном ядре и 1 на другом, чтобы я мог:

 result = expensive_process(next(iterator))

на каждом ядре, параллельно, а затем объединить и вернуть результат. Повторяйте этот шаг до тех пор, пока один или оба генератора не выйдут из строя.

Похоже, что TextFileReader не может быть рассортирован и не является генератором. Я не могу узнать, как это сделать в dask или многопроцессорной. Есть шаблон для этого?

Ответы [ 2 ]

0 голосов
/ 19 ноября 2018

Так что, к счастью, я думаю, что эта проблема прекрасно отображается в многопроцессорной обработке Python .Process и .Queue.

def data_generator(whatever):
   for v in something(whatever):
      yield v

def generator_constructor(whatever):
   def generator(outputQueue):
      for d in data_generator(whatever):
         outputQueue.put(d)
      outputQueue.put(None) # sentinel
   return generator

def procSumGenerator():
   outputQs = [Queue(size) for _ in range(NumCores)]
   procs = [Process(target=generator_constructor(whatever),
                    args=(outputQs[i],))
            for i in range(NumCores)] 

   for proc in procs: proc.start()

   # until any output queue returns a None, collect 
   # from all and yield
   done = False
   while not done:
      results = [oq.get() for oq in outputQs]
      done = any(res is None for res in results)
      if not done:
         yield some_combination_of(results)

   for proc in procs: terminate()

for v in procSumGenerator():
   print(v)

Может быть, это можно сделать лучше с помощью Dask?Я считаю, что мое решение довольно быстро насыщает сеть большими объемами генерируемых данных - я манипулирую csvs пандами и возвращаю большие массивы.

https://github.com/colinator/doodle_generator/blob/master/data_generator_uniform_final.ipynb

0 голосов
/ 18 ноября 2018

Dask read_csv предназначен для загрузки данных из нескольких файлов в чанках с размером чанка, который вы можете указать.Когда вы оперируете с результирующим фреймом данных, вы будете работать по частям, что и является точкой с самого начала использования Dask.Не должно быть необходимости использовать метод итератора.

Метод dask dataframe, который вы захотите использовать, скорее всего, будет map_partitions().

Если вы действительно Если вы хотите использовать идею итератора, вы должны взглянуть на dask.delayed, который способен распараллеливать произвольные функции python, посылая каждый вызов функции (с разными именами файлов для каждого) вашим работникам.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...