У меня есть 2 процесса: первый, на котором я создаю глобальный распределенный клиент; второй процесс - это веб-скребок, который должен получить глобального клиента и отправить ему задачи, а когда все будет сделано, он отправит сообщение другому процессу, чтобы сообщить ему, что он может продолжить.
from dask.distributed import Client, as_completed
from multiprocessing import Process
from time import sleep
import zmq
def get(url) -> dict:
# downloads data from url
time.sleep(3)
return data
def save(data) -> None:
# saves data locally
time.sleep(3)
return None
def scraper(urls):
# global client
client = get_client()
# zeromq socket
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind('tcp://*:port')
while True:
for future, result in as_completed([client.submit(get, url=url) for url in urls], with_results=True):
save(data=result)
socket.send_string('All job is done for this minute, proceed.')
sleep(60)
if __name__ == '__main__':
client = Client()
s = Process(target=scraper, *args, **kwargs)
s.start()
проблема в том, что из функции скребка я могу получить глобальный клиент (я вижу его правильно, если я его распечатаю), но я не могу представить ему какую-либо задачу. Консоль не печатает никаких ошибок, она просто зависает, ничего не делая. Я думаю, что причина в том, что функция скребка выполняется на многопроцессорной обработке saparate. Процесс.
Любое решение или обходной путь? Спасибо.