Запустите бесконечный процесс, который проверяет многопроцессорные очереди на обработку - PullRequest
0 голосов
/ 07 июня 2019

Программа состоит из трех частей: добавление данных в очереди, обработка данных в очередях и проверка, пуста ли очередь. Каждый, кроме Add (), все части имеют свой собственный процесс. Программа должна функционировать так, когда мы ее запускаем,

Он должен продолжать проверять очереди, если что-то в нем запускает функцию, если не продолжать работу.
В то же время асинхронные данные могут быть добавлены в очередь, и она должна выполнить задание, как указано ранее Одновременно из очереди может обрабатываться только одна вещь.

Я использую Windows, и я продолжаю получать

TypeError: can't pickle _thread.lock objects

Вот код

from multiprocessing import Queue,Process
from time import sleep
import threading
from multiprocessing.pool import ThreadPool
from multiprocessing import dummy as multithreading
import concurrent.futures

# import queue
class A(object):
    def __init__(self, *args, **kwargs):
        self.Q = Queue()

#Add data to queue: should be accessable all time
    def Add(self, i):
        # q = Queue()
        self.Q.put(threading.Thread(target=self.printAns(i)))

#Processes the data: runs only upon call
    def printAns(self,name):
        print("Name to print is: ",name)
        return 'completed'

#This function call printANS as a process
    def jobRun(self):
        # job = self.Q.get()
        # ans = Queue()
        jobThread = self.Q.get()
        async_result = jRPool.apply_async(jobThread)
        print(async_result.get())

#Checks if the queue has anything: checker functions needs to run constantly
    def checkQueue(self):
        while True:
            if self.Q.empty():
                pass
            else:
                return True

#should initiate call to checker upon success calls jobRun() as a process and go back to checking
    def run(self):
        with concurrent.futures.ProcessPoolExecutor() as executor:
            checkfunc = executor.map(self.checkQueue)
            while True:
                if checkfunc:
                    sleep(1)
                    executor.map(self.jobRun)
                    self.Q.close()  




if __name__ == '__main__':
    a = A()
    a.Add("test1")
    a.Add("test2")
    a.run()
    # a.Add("this")
    while True:
        data = input("Enter a string: ")
        a.Add(data)

Любая помощь очень ценится. Я догадываюсь, что это связано с замками или семафорами.

1 Ответ

0 голосов
/ 07 июня 2019

Что-то подобное может сработать, если я правильно понял ваши требования.

  • compute - это функция, вызываемая в удаленном процессе. Прямо сейчас он просто печатает PID работника, обрабатывающего вещи, и печатает каждую букву строки. Это позволяет видеть процессы, работающие параллельно.
  • tasks - это список объектов задач, которые поток poll_results должен отслеживать.
  • poll_results - цель threading.Thread, которая выполняется в основном процессе; он занят зацикливанием задач в tasks, распечатывая значение результата, как только они будут готовы.
import os
import time
import threading
from multiprocessing import Pool


def compute(value):
    for ch in value:
        print(os.getpid(), ch)
        time.sleep(0.05)
    return (value, len(value))


tasks = []


def poll_results():
    while True:
        for task in tasks[:]:
            if task.ready():
                print("Task finished:", task.get())
                tasks.remove(task)


def main():
    poller_thread = threading.Thread(target=poll_results)
    poller_thread.start()

    with Pool() as p:
        t1 = p.apply_async(compute, ("hello",))
        t2 = p.apply_async(compute, ("world",))
        # Wait for the first results before entering the loop
        t1.wait()
        t2.wait()

        while True:
            data = input("Enter a string: ")
            tasks.append(p.apply_async(compute, (data,)))


if __name__ == "__main__":
    main()

Вывод здесь что-то вроде

65755 h
65756 w
65755 e
65756 o
65755 l
65756 r
65755 l
65756 l
65755 o
65756 d
Enter a string: Hello, world!
Enter a string: 65757 H
65757 e
65757 l
65757 l
65757 o
65757 ,
65757
65757 w
65757 o
65757 r
65757 l
65757 d
65757 !
Task finished: ('Hello, world!', 13)

Обратите внимание на то, как чередуется ввод-вывод (вы получаете второе приглашение «Ввести строку» еще до того, как задача обрабатывается, и эти два сотрудника распечатывают письма «привет» и «мир», когда могут).

...