Невозможно отправить несколько аргументов в concurrrent.futures.Executor.map () - PullRequest
0 голосов
/ 07 июня 2019

Я пытаюсь объединить решения, представленные в обоих ответах SO - Использование потоков для разбиения массива на куски и выполнения вычислений для каждого чанка и повторной сборки возвращаемых массивов в один массив и Передайте несколько параметров в concurrent.futures.Executor.map?.У меня есть пустой массив, который я разделяю на сегменты, и я хочу, чтобы каждый кусок отправлялся в отдельный поток, а также дополнительный аргумент для отправки вместе с фрагментом исходного массива.Этот дополнительный аргумент является константой и не изменится.ExecuteCalc - это функция, которая будет принимать два аргумента - один фрагмент исходного массива numpy и константу.

Первое решение, которое я попробовал

import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def main():
    testThread()

def testThread():

    minLat = -65.76892
    maxLat =  66.23587
    minLon =  -178.81404
    maxLon =  176.2949
    latGrid = np.arange(minLat,maxLat,0.05)
    lonGrid = np.arange(minLon,maxLon,0.05)

    gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
    grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]

    n_jobs = psutil.cpu_count(logical=False)

    chunk = np.array_split(grid_points,n_jobs,axis=0)


   x = ThreadPoolExecutor(max_workers=n_jobs) 
   maxDistance = 4.3
   func = partial(performCalc,chunk)
   args = [chunk,maxDistance]
   # This prints 4.3 twice although there are four cores in the system
   results = x.map(func,args)
   # This prints 4.3 four times correctly
   results1 = x.map(performTest,chunk)

  def performCalc(chunk,maxDistance):
      print(maxDistance)
      return chunk

 def performTest(chunk):
     print("test")

 main()

Так что executeCalc () печатает 4,3 дважды, хотя число ядер в системе равно 4. В то время как executeTest () печатает тест четыре раза правильно.Я не могу выяснить причину этой ошибки.

Также я уверен, что способ установки вызова для itertools.partial неверен.

1) Существует четыре блока исходного массива numpy.

2) Каждый блок должен быть связан с maxDistance и отправлен в executeCalc ()

3)будет четыре потока, которые будут печатать maxDistance и возвращать части общего результата, которые будут возвращены в одном массиве

Куда я иду не так?

ОБНОВЛЕНИЕ

Я также пытался использовать лямбда-подход

results = x.map(lambda p:performCalc(*p),args)

, но это ничего не печатает.

1 Ответ

0 голосов
/ 08 июня 2019

Используя решение, предоставленное пользователем mkorvas, как показано здесь - Как передать функцию с более чем одним аргументом в python concurrent.futures.ProcessPoolExecutor.map ()? Я смог решить свою проблему как показано в решении здесь -

import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def main():
   testThread()

def testThread():

   minLat = -65.76892
   maxLat =  66.23587
   minLon =  -178.81404
   maxLon =  176.2949
   latGrid = np.arange(minLat,maxLat,0.05)
   lonGrid = np.arange(minLon,maxLon,0.05)
   print(latGrid.shape,lonGrid.shape)
   gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
   grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
   print(grid_points.shape)
   n_jobs = psutil.cpu_count(logical=False)
   chunk = np.array_split(grid_points,n_jobs,axis=0)
   x = ThreadPoolExecutor(max_workers=n_jobs) 


  maxDistance = 4.3
  func = partial(performCalc,maxDistance)

  results = x.map(func,chunk)


 def performCalc(maxDistance,chunk):

     print(maxDistance)
     return chunk

main()

Что, по-видимому, нужно сделать (и я не знаю, почему и, может быть, кто-то может уточнить в другом ответе), это то, что вам нужно переключить порядок ввода на функцию executeCalc ()

как показано здесь -

      def performCalc(maxDistance,chunk):

          print(maxDistance)
          return chunk
...