многопоточные процессы в python, использующие очередь для записи в файл, проверяя, была ли выполнена работа - PullRequest
0 голосов
/ 10 апреля 2020
from multiprocessing.dummy import Pool as ThreadPool
import multiprocessing as mp



def func(a):

    pthData = "C:/temp/temp.txt"
    with open(pthData, 'r') as file:
        done = file.read().splitlines()

    if a in done:
        return 'done'

    q.put(a)
    return a

def listener(q):

    pthData = "C:/temp/temp.txt"
    m = q.get()
    with open(pthData, 'a') as the_file:
        the_file.write( m + '\n')
        #he_file.write(str(m) + '\n')


a =  ['a', 'b', 'c', 'd', 'a', 'b']


# Make the Pool of workers
pool = ThreadPool(4)

#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()    

#put listener to work first
watcher = pool.apply_async(listener, (q,))

pool.starmap(func, a, q)
## TypeError: '<=' not supported between instances of 'AutoProxy[Queue]' and 'int'

pool.starmap(func, a)
## Runs but only writes 'a' to temp file

pool.starmap(func, (a, q))
## func() takes 1 positional argument but 6 were given

pool.apply_async(func, (a, q))
## freezes on pool.join

# Close the pool and wait for the work to finish
pool.close()
pool.join()

Почему apply_asyn c зависает в pool.join ()? Я попытался поместить его в if name == ' main ', но результат был тот же.

Как правильно вызвать func, передав 1 аргумент (а) и очередь (q)?

1 Ответ

1 голос
/ 10 апреля 2020

Как правильно назвать fun c передачей 1 аргумента (a) и очереди (q)?

Это как минимум не freeze :

  • Убедитесь, что temp.txt существует до выполнения.
  • Добавьте параметр q к func.
      def func(a,q):
          print(f'func({a})')
          ...
  • Используйте apply_async в понимании списка.
    if __name__ == '__main__':

        # Make the Pool of workers
        with ThreadPool(4) as pool:
            q = queue.Queue()
            #put listener to work first
            watcher = pool.apply_async(listener, (q,))
            results = [pool.apply_async(func, (item, q)) for item in a]
            # just check stuff
            for result in results:
                result.wait()
                print(result, result.successful(),result.get())
            pool.close()
            pool.join()

  • Вам нужно будет решить некоторые другие проблемы, такие как listener, запущенный один раз и затем остановившийся.
  • Многие другие способы сделать это, я использовал apply_async, потому что это был один из вариантов в вашем вопросе.
  • Мне нравится самому использовать concurrent.futures.
  • Вы может быть полезно прочитать результаты поиска, используя варианты python threading producer consumer site:stackoverflow.com
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...